Browse Source

Add comprehensive debug logging to diagnose message corruption in GitHub Actions

This commit adds detailed debug logging throughout the message flow to help
diagnose the 'Message content mismatch' error observed in GitHub Actions:

1. Mock backend flow (unit tests):
   - [MOCK_STORE]: Log when storing messages to mock handler
   - [MOCK_RETRIEVE]: Log when retrieving messages from mock handler

2. Real SMQ backend flow (GitHub Actions):
   - [LOG_BUFFER_UNMARSHAL]: Log when unmarshaling LogEntry from log buffer
   - [BROKER_SEND]: Log when broker sends data to subscriber clients

3. Gateway decode flow (both backends):
   - [DECODE_START]: Log message bytes before decoding
   - [DECODE_NO_SCHEMA]: Log when returning raw bytes (schema disabled)
   - [DECODE_INVALID_RV]: Log when RecordValue validation fails
   - [DECODE_VALID_RV]: Log when valid RecordValue detected

All new logs use glog.Infof() so they appear without requiring -v flags.
This will help identify where data corruption occurs in the CI environment.
pull/7329/head
chrislu 4 days ago
parent
commit
0019149ae2
  1. 12
      weed/mq/broker/broker_grpc_sub.go
  2. 25
      weed/mq/kafka/gateway/test_mock_handler.go
  3. 21
      weed/mq/kafka/protocol/fetch.go
  4. 12
      weed/util/log_buffer/log_read.go

12
weed/mq/broker/broker_grpc_sub.go

@ -272,6 +272,18 @@ subscribeLoop:
TsNs: logEntry.TsNs,
}
// DEBUG: Log broker sending data to client
valuePreview := ""
if len(logEntry.Data) > 0 {
if len(logEntry.Data) <= 50 {
valuePreview = string(logEntry.Data)
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data))
}
}
glog.Infof("[BROKER_SEND] Sending to client=%s valueLen=%d valuePreview=%q offset=%d",
clientName, len(logEntry.Data), valuePreview, logEntry.Offset)
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: dataMsg,
}}); err != nil {

25
weed/mq/kafka/gateway/test_mock_handler.go

@ -6,6 +6,7 @@ import (
"sync"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -121,6 +122,18 @@ func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName stri
offset := m.offsets[topicName][partitionID]
m.offsets[topicName][partitionID]++
// DEBUG: Log what's being stored
valuePreview := ""
if len(value) > 0 {
if len(value) <= 50 {
valuePreview = string(value)
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(value[:50]), len(value))
}
}
glog.Infof("[MOCK_STORE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q",
topicName, partitionID, offset, len(value), valuePreview)
// Store record
record := &mockRecord{
key: key,
@ -155,6 +168,18 @@ func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic strin
result := make([]integration.SMQRecord, 0, maxRecords)
for _, record := range partitionRecords {
if record.GetOffset() >= fromOffset {
// DEBUG: Log what's being retrieved
valuePreview := ""
if len(record.GetValue()) > 0 {
if len(record.GetValue()) <= 50 {
valuePreview = string(record.GetValue())
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(record.GetValue()[:50]), len(record.GetValue()))
}
}
glog.Infof("[MOCK_RETRIEVE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q",
topic, partition, record.GetOffset(), len(record.GetValue()), valuePreview)
result = append(result, record)
if len(result) >= maxRecords {
break

21
weed/mq/kafka/protocol/fetch.go

@ -1132,10 +1132,23 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return nil
}
// DEBUG: Log what we're decoding
valuePreview := ""
if len(recordValueBytes) > 0 {
if len(recordValueBytes) <= 50 {
valuePreview = string(recordValueBytes)
} else {
valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(recordValueBytes[:50]), len(recordValueBytes))
}
}
glog.Infof("[DECODE_START] topic=%s schemaEnabled=%v bytesLen=%d preview=%q",
topicName, h.IsSchemaEnabled(), len(recordValueBytes), valuePreview)
// For system topics like _schemas, _consumer_offsets, etc.,
// return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.)
// and should NOT be processed as RecordValue protobuf messages.
if strings.HasPrefix(topicName, "_") {
glog.Infof("[DECODE_SYSTEM_TOPIC] Returning raw bytes for system topic %s", topicName)
return recordValueBytes
}
@ -1143,6 +1156,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
// All messages are stored as raw bytes when schema management is disabled
// Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing
if !h.IsSchemaEnabled() {
glog.Infof("[DECODE_NO_SCHEMA] Returning raw bytes (schema disabled)")
return recordValueBytes
}
@ -1159,9 +1173,12 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
// We need to check if this looks like a real RecordValue or just random bytes
if !h.isValidRecordValue(recordValue, recordValueBytes) {
// Not a valid RecordValue - return raw bytes as-is
glog.Infof("[DECODE_INVALID_RV] Not a valid RecordValue, returning raw bytes")
return recordValueBytes
}
glog.Infof("[DECODE_VALID_RV] Valid RecordValue detected, re-encoding...")
// If schema management is enabled, re-encode the RecordValue to Confluent format
if h.IsSchemaEnabled() {
if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil {
@ -1171,7 +1188,9 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
}
// Fallback: convert RecordValue to JSON
return h.recordValueToJSON(recordValue)
result := h.recordValueToJSON(recordValue)
glog.Infof("[DECODE_END] Returning JSON converted bytes, len=%d", len(result))
return result
}
// isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes

12
weed/util/log_buffer/log_read.go

@ -355,6 +355,18 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
continue
}
// DEBUG: Log unmarshaled entry with data preview
dataPreview := ""
if len(logEntry.Data) > 0 {
if len(logEntry.Data) <= 50 {
dataPreview = string(logEntry.Data)
} else {
dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data))
}
}
glog.Infof("[LOG_BUFFER_UNMARSHAL] Offset=%d TsNs=%d dataLen=%d dataPreview=%q",
logEntry.Offset, logEntry.TsNs, len(logEntry.Data), dataPreview)
glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))
// Handle offset-based filtering for offset-based start positions

Loading…
Cancel
Save