Browse Source

Fix Kafka message corruption due to buffer sharing in produce requests

CRITICAL BUG FIX: The recordSetData slice was sharing the underlying array with the
request buffer, causing data corruption when the request buffer was reused or
modified. This led to Kafka record batch header bytes overwriting stored message
data, resulting in corrupted messages like:

Expected: 'test-message-kafka-go-default'
Got:      '������������kafka-go-default'

The corruption pattern matched Kafka batch header bytes (0x01, 0x00, 0xFF, etc.)
indicating buffer sharing between the produce request parsing and message storage.

SOLUTION: Make a defensive copy of recordSetData in both produce request handlers
(handleProduceV0V1 and handleProduceV2Plus) to prevent slice aliasing issues.

Changes:
- weed/mq/kafka/protocol/produce.go: Copy recordSetData to prevent buffer sharing
- Remove debug logging added during investigation

Fixes:
- TestClientCompatibility/KafkaGoVersionCompatibility/kafka-go-default
- TestClientCompatibility/KafkaGoVersionCompatibility/kafka-go-with-batching
- Message content mismatch errors in GitHub Actions CI

This was a subtle memory safety issue that only manifested under certain timing
conditions, making it appear intermittent in CI environments.

Make a copy of recordSetData to prevent buffer sharing corruption
pull/7329/head
chrislu 4 days ago
parent
commit
466aaa0ec3
  1. 11
      weed/mq/broker/broker_grpc_sub.go
  2. 24
      weed/mq/kafka/gateway/test_mock_handler.go
  3. 20
      weed/mq/kafka/protocol/fetch.go
  4. 11
      weed/util/log_buffer/log_read.go

11
weed/mq/broker/broker_grpc_sub.go

@ -272,17 +272,6 @@ subscribeLoop:
TsNs: logEntry.TsNs,
}
// DEBUG: Log broker sending data to client
valuePreview := ""
if len(logEntry.Data) > 0 {
if len(logEntry.Data) <= 50 {
valuePreview = string(logEntry.Data)
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data))
}
}
glog.Infof("[BROKER_SEND] Sending to client=%s valueLen=%d valuePreview=%q offset=%d",
clientName, len(logEntry.Data), valuePreview, logEntry.Offset)
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: dataMsg,

24
weed/mq/kafka/gateway/test_mock_handler.go

@ -6,7 +6,6 @@ import (
"sync"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -122,17 +121,6 @@ func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName stri
offset := m.offsets[topicName][partitionID]
m.offsets[topicName][partitionID]++
// DEBUG: Log what's being stored
valuePreview := ""
if len(value) > 0 {
if len(value) <= 50 {
valuePreview = string(value)
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(value[:50]), len(value))
}
}
glog.Infof("[MOCK_STORE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q",
topicName, partitionID, offset, len(value), valuePreview)
// Store record
record := &mockRecord{
@ -168,18 +156,6 @@ func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic strin
result := make([]integration.SMQRecord, 0, maxRecords)
for _, record := range partitionRecords {
if record.GetOffset() >= fromOffset {
// DEBUG: Log what's being retrieved
valuePreview := ""
if len(record.GetValue()) > 0 {
if len(record.GetValue()) <= 50 {
valuePreview = string(record.GetValue())
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(record.GetValue()[:50]), len(record.GetValue()))
}
}
glog.Infof("[MOCK_RETRIEVE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q",
topic, partition, record.GetOffset(), len(record.GetValue()), valuePreview)
result = append(result, record)
if len(result) >= maxRecords {
break

20
weed/mq/kafka/protocol/fetch.go

@ -1132,23 +1132,11 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return nil
}
// DEBUG: Log what we're decoding
valuePreview := ""
if len(recordValueBytes) > 0 {
if len(recordValueBytes) <= 50 {
valuePreview = string(recordValueBytes)
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(recordValueBytes[:50]), len(recordValueBytes))
}
}
glog.Infof("[DECODE_START] topic=%s schemaEnabled=%v bytesLen=%d preview=%q",
topicName, h.IsSchemaEnabled(), len(recordValueBytes), valuePreview)
// For system topics like _schemas, _consumer_offsets, etc.,
// return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.)
// and should NOT be processed as RecordValue protobuf messages.
if strings.HasPrefix(topicName, "_") {
glog.Infof("[DECODE_SYSTEM_TOPIC] Returning raw bytes for system topic %s", topicName)
return recordValueBytes
}
@ -1156,7 +1144,6 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
// All messages are stored as raw bytes when schema management is disabled
// Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing
if !h.IsSchemaEnabled() {
glog.Infof("[DECODE_NO_SCHEMA] Returning raw bytes (schema disabled)")
return recordValueBytes
}
@ -1173,12 +1160,9 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
// We need to check if this looks like a real RecordValue or just random bytes
if !h.isValidRecordValue(recordValue, recordValueBytes) {
// Not a valid RecordValue - return raw bytes as-is
glog.Infof("[DECODE_INVALID_RV] Not a valid RecordValue, returning raw bytes")
return recordValueBytes
}
glog.Infof("[DECODE_VALID_RV] Valid RecordValue detected, re-encoding...")
// If schema management is enabled, re-encode the RecordValue to Confluent format
if h.IsSchemaEnabled() {
if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil {
@ -1188,9 +1172,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
}
// Fallback: convert RecordValue to JSON
result := h.recordValueToJSON(recordValue)
glog.Infof("[DECODE_END] Returning JSON converted bytes, len=%d", len(result))
return result
return h.recordValueToJSON(recordValue)
}
// isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes

11
weed/util/log_buffer/log_read.go

@ -355,17 +355,6 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
continue
}
// DEBUG: Log unmarshaled entry with data preview
dataPreview := ""
if len(logEntry.Data) > 0 {
if len(logEntry.Data) <= 50 {
dataPreview = string(logEntry.Data)
} else {
dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data))
}
}
glog.Infof("[LOG_BUFFER_UNMARSHAL] Offset=%d TsNs=%d dataLen=%d dataPreview=%q",
logEntry.Offset, logEntry.TsNs, len(logEntry.Data), dataPreview)
glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))

Loading…
Cancel
Save