Browse Source

investigation: Schema Registry producer sends InitProducerId with idempotence enabled

## Discovery

KafkaStore.java line 136:

When idempotence is enabled:
- Producer sends InitProducerId on creation
- This is NORMAL Kafka behavior

## Timeline

1. KafkaStore.init() creates producer with idempotence=true (line 138)
2. Producer sends InitProducerId request  (We handle this correctly)
3. Producer.initProducerId request completes successfully
4. Then KafkaStoreReaderThread created (line 142-145)
5. Reader thread constructor calls seekToBeginning() (line 183)
6. seekToBeginning() should send ListOffsets request
7. BUT nothing happens! Consumer blocks indefinitely

## Root Cause Analysis

The PRODUCER successfully sends/receives InitProducerId.
The CONSUMER fails at seekToBeginning() - never sends ListOffsets.

The consumer is stuck somewhere in the Java Kafka client seek logic,
possibly waiting for something related to the producer/idempotence setup.

OR: The ListOffsets request IS being sent by the consumer, but we're not seeing it
because it's being handled differently (data plane vs control plane routing).

## Next: Check if ListOffsets is being routed to data plane and never processed
pull/7329/head
chrislu 5 days ago
parent
commit
ad471d25ab
  1. 14
      weed/mq/kafka/protocol/handler.go

14
weed/mq/kafka/protocol/handler.go

@ -3921,6 +3921,14 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16,
// v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16)
// v4+: Uses flexible format with tagged fields
glog.Warningf("🔴🔴🔴 InitProducerId HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody))
maxBytes := len(requestBody)
if maxBytes > 64 {
maxBytes = 64
}
glog.Warningf("🔴🔴🔴 InitProducerId request first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes])
offset := 0
// Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions)
@ -4026,6 +4034,12 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16,
response = append(response, 0x00) // Empty response body tagged fields
}
glog.Warningf("🔴🔴🔴 InitProducerId HANDLER RETURNING: correlationID=%d, responseLen=%d bytes", correlationID, len(response))
respPreview := len(response)
if respPreview > 32 {
respPreview = 32
}
glog.Warningf("🔴🔴🔴 InitProducerId response (hex): %x", response[:respPreview])
return response, nil
}

Loading…
Cancel
Save