diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 05ff1a16c..341edb3fb 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -267,6 +267,134 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) } +// extractAllRecords parses a Kafka record batch and returns all records' key/value pairs +func (h *Handler) extractAllRecords(recordSetData []byte) []struct{ Key, Value []byte } { + results := make([]struct{ Key, Value []byte }, 0, 8) + + if len(recordSetData) < 61 { + // Too small to be a full batch; treat as single opaque record + key, value := h.extractFirstRecord(recordSetData) + results = append(results, struct{ Key, Value []byte }{Key: key, Value: value}) + return results + } + + // Parse record batch header (Kafka v2) + offset := 0 + offset += 8 // base_offset + _ = binary.BigEndian.Uint32(recordSetData[offset:]) + offset += 4 // batch_length + offset += 4 // partition_leader_epoch + if offset >= len(recordSetData) { + return results + } + magic := recordSetData[offset] // magic + offset += 1 + if magic != 2 { + // Unsupported, fallback + key, value := h.extractFirstRecord(recordSetData) + results = append(results, struct{ Key, Value []byte }{Key: key, Value: value}) + return results + } + + // Skip CRC, attributes, last_offset_delta, first/max timestamps, producer info, base seq + offset += 4 // crc + offset += 2 // attributes + offset += 4 // last_offset_delta + offset += 8 // first_timestamp + offset += 8 // max_timestamp + offset += 8 // producer_id + offset += 2 // producer_epoch + offset += 4 // base_sequence + + // records_count + if offset+4 > len(recordSetData) { + return results + } + recordsCount := int(binary.BigEndian.Uint32(recordSetData[offset:])) + offset += 4 + + // Iterate records + for i := 0; i < recordsCount && offset < len(recordSetData); i++ { + // record_length (varint) + recLen, n := decodeVarint(recordSetData[offset:]) + if n == 0 || recLen < 0 { + break + } + offset += n + if offset+int(recLen) > len(recordSetData) { + break + } + rec := recordSetData[offset : offset+int(recLen)] + offset += int(recLen) + + // Parse record fields + rpos := 0 + if rpos >= len(rec) { + break + } + rpos += 1 // attributes + + // timestamp_delta (varint) + _, n = decodeVarint(rec[rpos:]) + if n == 0 { + continue + } + rpos += n + // offset_delta (varint) + _, n = decodeVarint(rec[rpos:]) + if n == 0 { + continue + } + rpos += n + + // key + keyLen, n := decodeVarint(rec[rpos:]) + if n == 0 { + continue + } + rpos += n + var key []byte + if keyLen >= 0 { + if rpos+int(keyLen) > len(rec) { + continue + } + key = rec[rpos : rpos+int(keyLen)] + rpos += int(keyLen) + } + + // value + valLen, n := decodeVarint(rec[rpos:]) + if n == 0 { + continue + } + rpos += n + var value []byte + if valLen >= 0 { + if rpos+int(valLen) > len(rec) { + continue + } + value = rec[rpos : rpos+int(valLen)] + rpos += int(valLen) + } + + // headers (varint) - skip + _, n = decodeVarint(rec[rpos:]) + if n == 0 { /* ignore */ + } + + // normalize nils to empty slices + if key == nil { + key = []byte{} + } + if value == nil { + value = []byte{} + } + results = append(results, struct{ Key, Value []byte }{Key: key, Value: value}) + } + + return results +} + // extractFirstRecord extracts the first record from a Kafka record batch func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) { fmt.Printf("DEBUG: extractFirstRecord - parsing %d bytes\n", len(recordSetData)) @@ -640,13 +768,28 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r if parseErr != nil { errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { - // Use SeaweedMQ integration - offsetVal, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) - if err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR - } else { - baseOffset = offsetVal + // Extract all records from the record set and publish each one + records := h.extractAllRecords(recordSetData) + if len(records) == 0 { + // Fallback to first record extraction + key, value := h.extractFirstRecord(recordSetData) + records = append(records, struct{ Key, Value []byte }{Key: key, Value: value}) } + + var firstOffsetSet bool + for idx, kv := range records { + offsetProduced, prodErr := h.seaweedMQHandler.ProduceRecord(topicName, int32(partitionID), kv.Key, kv.Value) + if prodErr != nil { + errorCode = 1 // UNKNOWN_SERVER_ERROR + break + } + if idx == 0 { + baseOffset = offsetProduced + firstOffsetSet = true + } + } + + _ = firstOffsetSet } }