Browse Source

Update Produce v2+ to store all records from batch to SMQ

Update Produce v2+ to store all records from batch to SMQ
pull/7231/head
chrislu 2 months ago
parent
commit
d7e1c83ca8
  1. 153
      weed/mq/kafka/protocol/produce.go

153
weed/mq/kafka/protocol/produce.go

@ -267,6 +267,134 @@ func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetDat
return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value)
}
// extractAllRecords parses a Kafka record batch and returns all records' key/value pairs
func (h *Handler) extractAllRecords(recordSetData []byte) []struct{ Key, Value []byte } {
results := make([]struct{ Key, Value []byte }, 0, 8)
if len(recordSetData) < 61 {
// Too small to be a full batch; treat as single opaque record
key, value := h.extractFirstRecord(recordSetData)
results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
return results
}
// Parse record batch header (Kafka v2)
offset := 0
offset += 8 // base_offset
_ = binary.BigEndian.Uint32(recordSetData[offset:])
offset += 4 // batch_length
offset += 4 // partition_leader_epoch
if offset >= len(recordSetData) {
return results
}
magic := recordSetData[offset] // magic
offset += 1
if magic != 2 {
// Unsupported, fallback
key, value := h.extractFirstRecord(recordSetData)
results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
return results
}
// Skip CRC, attributes, last_offset_delta, first/max timestamps, producer info, base seq
offset += 4 // crc
offset += 2 // attributes
offset += 4 // last_offset_delta
offset += 8 // first_timestamp
offset += 8 // max_timestamp
offset += 8 // producer_id
offset += 2 // producer_epoch
offset += 4 // base_sequence
// records_count
if offset+4 > len(recordSetData) {
return results
}
recordsCount := int(binary.BigEndian.Uint32(recordSetData[offset:]))
offset += 4
// Iterate records
for i := 0; i < recordsCount && offset < len(recordSetData); i++ {
// record_length (varint)
recLen, n := decodeVarint(recordSetData[offset:])
if n == 0 || recLen < 0 {
break
}
offset += n
if offset+int(recLen) > len(recordSetData) {
break
}
rec := recordSetData[offset : offset+int(recLen)]
offset += int(recLen)
// Parse record fields
rpos := 0
if rpos >= len(rec) {
break
}
rpos += 1 // attributes
// timestamp_delta (varint)
_, n = decodeVarint(rec[rpos:])
if n == 0 {
continue
}
rpos += n
// offset_delta (varint)
_, n = decodeVarint(rec[rpos:])
if n == 0 {
continue
}
rpos += n
// key
keyLen, n := decodeVarint(rec[rpos:])
if n == 0 {
continue
}
rpos += n
var key []byte
if keyLen >= 0 {
if rpos+int(keyLen) > len(rec) {
continue
}
key = rec[rpos : rpos+int(keyLen)]
rpos += int(keyLen)
}
// value
valLen, n := decodeVarint(rec[rpos:])
if n == 0 {
continue
}
rpos += n
var value []byte
if valLen >= 0 {
if rpos+int(valLen) > len(rec) {
continue
}
value = rec[rpos : rpos+int(valLen)]
rpos += int(valLen)
}
// headers (varint) - skip
_, n = decodeVarint(rec[rpos:])
if n == 0 { /* ignore */
}
// normalize nils to empty slices
if key == nil {
key = []byte{}
}
if value == nil {
value = []byte{}
}
results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
}
return results
}
// extractFirstRecord extracts the first record from a Kafka record batch
func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) {
fmt.Printf("DEBUG: extractFirstRecord - parsing %d bytes\n", len(recordSetData))
@ -640,13 +768,28 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r
if parseErr != nil {
errorCode = 42 // INVALID_RECORD
} else if recordCount > 0 {
// Use SeaweedMQ integration
offsetVal, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData)
if err != nil {
// Extract all records from the record set and publish each one
records := h.extractAllRecords(recordSetData)
if len(records) == 0 {
// Fallback to first record extraction
key, value := h.extractFirstRecord(recordSetData)
records = append(records, struct{ Key, Value []byte }{Key: key, Value: value})
}
var firstOffsetSet bool
for idx, kv := range records {
offsetProduced, prodErr := h.seaweedMQHandler.ProduceRecord(topicName, int32(partitionID), kv.Key, kv.Value)
if prodErr != nil {
errorCode = 1 // UNKNOWN_SERVER_ERROR
} else {
baseOffset = offsetVal
break
}
if idx == 0 {
baseOffset = offsetProduced
firstOffsetSet = true
}
}
_ = firstOffsetSet
}
}

Loading…
Cancel
Save