Browse Source

debug: Add raw request logging - CONSUMER STUCK IN SEEK LOOP

BREAKTHROUGH: Found the exact point where consumer hangs!

## Request Statistics
2049 × Metadata (apiKey=3) - Repeatedly sent
  22 × ApiVersions (apiKey=18)
   6 × DescribeCluster (apiKey=60)
   0 × ListOffsets (apiKey=2) - NEVER SENT
   0 × Fetch (apiKey=1) - NEVER SENT
   0 × Produce (apiKey=0) - NEVER SENT

## Consumer Initialization Sequence
 Consumer created successfully
 partitionsFor() succeeds - finds _schemas topic with 1 partition
 assign() called - assigns partition to consumer
 seekToBeginning() BLOCKS HERE - never sends ListOffsets
 Never reaches poll() loop

## Why Metadata is Requested 2049 Times

Consumer stuck in retry loop:
1. Get metadata → works
2. Assign partition → works
3. Try to seek → blocks indefinitely
4. Timeout on seek
5. Retry metadata to find alternate broker
6. Loop back to step 1

## The Real Issue

Java KafkaConsumer is stuck at seekToBeginning() but NOT sending
ListOffsets requests. This indicates a BROKER CONNECTIVITY ISSUE
during offset seeking phase.

Root causes to investigate:
1. Metadata response missing critical fields (cluster ID, controller ID)
2. Broker address unreachable for seeks
3. Consumer group coordination incomplete
4. Network connectivity issue specific to seek operations

The 2049 metadata requests prove consumer can communicate with
gateway, but something in the broker assignment prevents seeking.
pull/7329/head
chrislu 5 days ago
parent
commit
84842eb6e9
  1. 15
      weed/mq/kafka/protocol/handler.go

15
weed/mq/kafka/protocol/handler.go

@ -908,8 +908,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
apiKey := binary.BigEndian.Uint16(messageBuf[0:2])
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
glog.Warningf("🔴 REQUEST LOOP: Parsed apiKey=%d, apiVersion=%d, correlationID=%d from %s", apiKey, apiVersion, correlationID, connectionID)
// LOG EVERY SINGLE REQUEST REGARDLESS OF apiKey
maxLen := len(messageBuf)
if maxLen > 16 {
maxLen = 16
}
glog.Warningf("🔴 RAW REQUEST: correlationID=%d, firstTwoBytes=[%02x %02x] = apiKey=%d, first16bytes=%v", correlationID, messageBuf[0], messageBuf[1], apiKey, messageBuf[:maxLen])
// Validate API version against what we support
if err := h.validateAPIVersion(apiKey, apiVersion); err != nil {
@ -2884,10 +2889,10 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey,
// handleMetadata routes to the appropriate version-specific handler
func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
glog.Warningf("🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d", apiVersion, correlationID, len(requestBody))
var response []byte
var err error
switch apiVersion {
case 0:
response, err = h.HandleMetadataV0(correlationID, requestBody)
@ -2909,7 +2914,7 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques
err = fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}
}
if err != nil {
glog.Warningf("🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v", apiVersion, correlationID, err)
} else {

Loading…
Cancel
Save