diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 0c806f807..ab1b1cb21 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -132,7 +132,11 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a break } - recordSetData := requestBody[offset : offset+int(recordSetSize)] + // CRITICAL FIX: Make a copy of recordSetData to prevent buffer sharing corruption + // The slice requestBody[offset:offset+int(recordSetSize)] shares the underlying array + // with the request buffer, which can be reused and cause data corruption + recordSetData := make([]byte, recordSetSize) + copy(recordSetData, requestBody[offset:offset+int(recordSetSize)]) offset += int(recordSetSize) // Response: partition_id(4) + error_code(2) + base_offset(8) + log_append_time(8) + log_start_offset(8) @@ -696,7 +700,11 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(requestBody) < offset+int(recordSetSize) { break } - recordSetData := requestBody[offset : offset+int(recordSetSize)] + // CRITICAL FIX: Make a copy of recordSetData to prevent buffer sharing corruption + // The slice requestBody[offset:offset+int(recordSetSize)] shares the underlying array + // with the request buffer, which can be reused and cause data corruption + recordSetData := make([]byte, recordSetSize) + copy(recordSetData, requestBody[offset:offset+int(recordSetSize)]) offset += int(recordSetSize) // Process the record set and store in ledger