diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f4795ae14..5e37fff6f 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -697,7 +697,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Connection closed, stop processing return case <-time.After(5 * time.Second): - glog.Errorf("[%s] DEADLOCK: Control plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID) } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting @@ -776,7 +775,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Connection closed, stop processing return case <-time.After(5 * time.Second): - glog.Errorf("[%s] DEADLOCK: Data plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID) } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index a51bb9ef2..cc61fd18a 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -583,7 +583,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, startTime := time.Now() // DEBUG: Log request details - glog.Infof("[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d", apiVersion, len(requestBody), correlationID) // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: @@ -607,8 +606,7 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(requestBody) < offset+int(txIDLen) { return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion) } - txID := string(requestBody[offset : offset+int(txIDLen)]) - glog.Infof("[NOOP-DEBUG] transactional_id=%s", txID) + _ = string(requestBody[offset : offset+int(txIDLen)]) offset += int(txIDLen) } } @@ -620,18 +618,15 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 - timeout := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 // DEBUG: Log acks and timeout - glog.Infof("[NOOP-DEBUG] acks=%d, timeout_ms=%d", acks, timeout) // Remember if this is fire-and-forget mode isFireAndForget := acks == 0 if isFireAndForget { - glog.Infof("[NOOP-DEBUG] Fire-and-forget mode (acks=0)") } else { - glog.Infof("[NOOP-DEBUG] Waiting for broker response (acks=%d)", acks) } if len(requestBody) < offset+4 { @@ -641,7 +636,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += 4 // DEBUG: Log topics count - glog.Infof("[NOOP-DEBUG] topicsCount=%d", topicsCount) // If topicsCount is implausible, there might be a parsing issue if topicsCount > 1000 { @@ -677,14 +671,12 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, offset += int(topicNameSize) // DEBUG: Log topic being processed - glog.Infof("[NOOP-DEBUG] Topic %d/%d: name=%s", i+1, topicsCount, topicName) // Parse partitions count partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 // DEBUG: Log partitions count - glog.Infof("[NOOP-DEBUG] Topic %s: partitionsCount=%d", topicName, partitionsCount) // Response: topic name (STRING: 2 bytes length + data) response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) @@ -720,20 +712,16 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, topicExists := h.seaweedMQHandler.TopicExists(topicName) // DEBUG: Log topic existence and record set details - glog.Infof("[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d", partitionID, topicExists, len(recordSetData)) if !topicExists { - glog.Warningf("[NOOP-DEBUG] Partition %d: Topic does not exist", partitionID) errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION } else { // Process the record set (lenient parsing) recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused // DEBUG: Log record count and parse error - glog.Infof("[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v", partitionID, recordCount, parseErr) if parseErr != nil { - glog.Warningf("[NOOP-DEBUG] Partition %d: parseRecordSet error: %v", partitionID, parseErr) errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { // Extract all records from the record set and publish each one @@ -741,30 +729,23 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, records := h.extractAllRecords(recordSetData) // DEBUG: Log extracted records count - glog.Infof("[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)", partitionID, len(records), recordCount) if len(records) > 0 { // DEBUG: Log first record details (especially for Noop with null value) if len(records[0].Value) > 0 { - glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d", partitionID, len(records[0].Value)) } else { - glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d", partitionID, len(records[0].Key)) // Log the key bytes in hex for identification - glog.Infof("[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x", partitionID, records[0].Key) } } if len(records) == 0 { - glog.Warningf("[NOOP-DEBUG] Partition %d: No records extracted despite recordCount=%d", partitionID, recordCount) errorCode = 42 // INVALID_RECORD } else { var firstOffsetSet bool for idx, kv := range records { - glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { - glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr) // Check if this is a schema validation error and add delay to prevent overloading if h.isSchemaValidationError(prodErr) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures @@ -774,7 +755,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } // DEBUG: Log offset received from broker - glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) if idx == 0 { baseOffset = offsetProduced @@ -786,21 +766,15 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } } else { // DEBUG: Critical case - recordCount is 0! - glog.Warningf("[NOOP-DEBUG] CRITICAL Partition %d: recordCount=0, but we should still try to extract records! recordSetDataLen=%d", partitionID, len(recordSetData)) // Try to extract anyway - this might be a Noop record records := h.extractAllRecords(recordSetData) - glog.Infof("[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records", partitionID, len(records)) if len(records) > 0 { - glog.Infof("[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0", partitionID, len(records)) for idx, kv := range records { - glog.Infof("[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)", partitionID, idx, len(records), len(kv.Key), len(kv.Value)) offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) if prodErr != nil { - glog.Warningf("[NOOP-DEBUG] Partition %d: Record %d produce error: %v", partitionID, idx, prodErr) errorCode = 1 // UNKNOWN_SERVER_ERROR break } - glog.Infof("[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d", partitionID, idx, offsetProduced) if idx == 0 { baseOffset = offsetProduced } @@ -810,7 +784,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } // DEBUG: Log response that will be sent - glog.Infof("[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d", partitionID, baseOffset, errorCode) // Build correct Produce v2+ response for this partition // Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5]