@ -594,6 +594,9 @@ func decodeVarint(data []byte) (int64, int) {
func ( h * Handler ) handleProduceV2Plus ( ctx context . Context , correlationID uint32 , apiVersion uint16 , requestBody [ ] byte ) ( [ ] byte , error ) {
startTime := time . Now ( )
// DEBUG: Log request details
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] handleProduceV2Plus START: apiVersion=%d, requestBodyLen=%d, correlationID=%d" , apiVersion , len ( requestBody ) , correlationID )
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
// In v2+, the main differences are:
// - Request: transactional_id field (nullable string) at the beginning
@ -616,7 +619,8 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if len ( requestBody ) < offset + int ( txIDLen ) {
return nil , fmt . Errorf ( "Produce v%d request transactional_id too short" , apiVersion )
}
_ = string ( requestBody [ offset : offset + int ( txIDLen ) ] ) // txID
txID := string ( requestBody [ offset : offset + int ( txIDLen ) ] )
glog . V ( 4 ) . Infof ( "[NOOP-DEBUG] transactional_id=%s" , txID )
offset += int ( txIDLen )
}
}
@ -631,6 +635,9 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
timeout := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// DEBUG: Log acks and timeout
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] acks=%d, timeout_ms=%d" , acks , timeout )
// CRITICAL FIX: Apply client-specified timeout to context
// If client specifies a timeout, create a new context with that timeout
// This ensures broker connections respect the client's expectations
@ -643,7 +650,9 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
// Remember if this is fire-and-forget mode
isFireAndForget := acks == 0
if isFireAndForget {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Fire-and-forget mode (acks=0)" )
} else {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Waiting for broker response (acks=%d)" , acks )
}
if len ( requestBody ) < offset + 4 {
@ -652,6 +661,9 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
topicsCount := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// DEBUG: Log topics count
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] topicsCount=%d" , topicsCount )
// If topicsCount is implausible, there might be a parsing issue
if topicsCount > 1000 {
return nil , fmt . Errorf ( "Produce v%d request has implausible topics count: %d" , apiVersion , topicsCount )
@ -685,10 +697,16 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
topicName := string ( requestBody [ offset : offset + int ( topicNameSize ) ] )
offset += int ( topicNameSize )
// DEBUG: Log topic being processed
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Topic %d/%d: name=%s" , i + 1 , topicsCount , topicName )
// Parse partitions count
partitionsCount := binary . BigEndian . Uint32 ( requestBody [ offset : offset + 4 ] )
offset += 4
// DEBUG: Log partitions count
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Topic %s: partitionsCount=%d" , topicName , partitionsCount )
// Response: topic name (STRING: 2 bytes length + data)
response = append ( response , byte ( topicNameSize >> 8 ) , byte ( topicNameSize ) )
response = append ( response , [ ] byte ( topicName ) ... )
@ -722,28 +740,52 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
// Check if topic exists; for v2+ do NOT auto-create
topicExists := h . seaweedMQHandler . TopicExists ( topicName )
// DEBUG: Log topic existence and record set details
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: topicExists=%v, recordSetDataLen=%d" , partitionID , topicExists , len ( recordSetData ) )
if ! topicExists {
glog . Warningf ( "[NOOP-DEBUG] Partition %d: Topic does not exist" , partitionID )
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
} else {
// Process the record set (lenient parsing)
recordCount , _ , parseErr := h . parseRecordSet ( recordSetData ) // totalSize unused
// DEBUG: Log record count and parse error
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: parseRecordSet returned recordCount=%d, parseErr=%v" , partitionID , recordCount , parseErr )
if parseErr != nil {
glog . Warningf ( "[NOOP-DEBUG] Partition %d: parseRecordSet error: %v" , partitionID , parseErr )
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
// Extract all records from the record set and publish each one
// extractAllRecords handles fallback internally for various cases
records := h . extractAllRecords ( recordSetData )
// DEBUG: Log extracted records count
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Extracted %d records from record set (recordCount was %d)" , partitionID , len ( records ) , recordCount )
if len ( records ) > 0 {
// DEBUG: Log first record details (especially for Noop with null value)
if len ( records [ 0 ] . Value ) > 0 {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Record 0 has value, len=%d" , partitionID , len ( records [ 0 ] . Value ) )
} else {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Record 0 has NULL value (likely Noop record), keyLen=%d" , partitionID , len ( records [ 0 ] . Key ) )
// Log the key bytes in hex for identification
glog . V ( 4 ) . Infof ( "[NOOP-DEBUG] Partition %d: Record 0 key (hex): %x" , partitionID , records [ 0 ] . Key )
}
}
if len ( records ) == 0 {
glog . Warningf ( "[NOOP-DEBUG] Partition %d: No records extracted despite recordCount=%d" , partitionID , recordCount )
errorCode = 42 // INVALID_RECORD
} else {
var firstOffsetSet bool
for idx , kv := range records {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)" , partitionID , idx , len ( records ) , len ( kv . Key ) , len ( kv . Value ) )
offsetProduced , prodErr := h . produceSchemaBasedRecord ( ctx , topicName , int32 ( partitionID ) , kv . Key , kv . Value )
if prodErr != nil {
glog . Warningf ( "[NOOP-DEBUG] Partition %d: Record %d produce error: %v" , partitionID , idx , prodErr )
// Check if this is a schema validation error and add delay to prevent overloading
if h . isSchemaValidationError ( prodErr ) {
time . Sleep ( 200 * time . Millisecond ) // Brief delay for schema validation failures
@ -751,6 +793,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
errorCode = 1 // UNKNOWN_SERVER_ERROR
break
}
// DEBUG: Log offset received from broker
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d" , partitionID , idx , offsetProduced )
if idx == 0 {
baseOffset = offsetProduced
firstOffsetSet = true
@ -759,9 +805,34 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
_ = firstOffsetSet
}
} else {
// DEBUG: Critical case - recordCount is 0!
glog . Warningf ( "[NOOP-DEBUG] CRITICAL Partition %d: recordCount=0, but we should still try to extract records! recordSetDataLen=%d" , partitionID , len ( recordSetData ) )
// Try to extract anyway - this might be a Noop record
records := h . extractAllRecords ( recordSetData )
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Even with recordCount=0, extracted %d records" , partitionID , len ( records ) )
if len ( records ) > 0 {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Processing %d records despite recordCount=0" , partitionID , len ( records ) )
for idx , kv := range records {
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Publishing record %d/%d (keyLen=%d, valueLen=%d)" , partitionID , idx , len ( records ) , len ( kv . Key ) , len ( kv . Value ) )
offsetProduced , prodErr := h . produceSchemaBasedRecord ( ctx , topicName , int32 ( partitionID ) , kv . Key , kv . Value )
if prodErr != nil {
glog . Warningf ( "[NOOP-DEBUG] Partition %d: Record %d produce error: %v" , partitionID , idx , prodErr )
errorCode = 1 // UNKNOWN_SERVER_ERROR
break
}
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Record %d produced at offset=%d" , partitionID , idx , offsetProduced )
if idx == 0 {
baseOffset = offsetProduced
}
}
}
}
}
// DEBUG: Log response that will be sent
glog . V ( 2 ) . Infof ( "[NOOP-DEBUG] Partition %d: Sending response - offset=%d, errorCode=%d" , partitionID , baseOffset , errorCode )
// Build correct Produce v2+ response for this partition
// Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5]