From 0019149ae2acc53fd084a4a0f9184cbf52817b31 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 17:54:29 -0700 Subject: [PATCH] Add comprehensive debug logging to diagnose message corruption in GitHub Actions This commit adds detailed debug logging throughout the message flow to help diagnose the 'Message content mismatch' error observed in GitHub Actions: 1. Mock backend flow (unit tests): - [MOCK_STORE]: Log when storing messages to mock handler - [MOCK_RETRIEVE]: Log when retrieving messages from mock handler 2. Real SMQ backend flow (GitHub Actions): - [LOG_BUFFER_UNMARSHAL]: Log when unmarshaling LogEntry from log buffer - [BROKER_SEND]: Log when broker sends data to subscriber clients 3. Gateway decode flow (both backends): - [DECODE_START]: Log message bytes before decoding - [DECODE_NO_SCHEMA]: Log when returning raw bytes (schema disabled) - [DECODE_INVALID_RV]: Log when RecordValue validation fails - [DECODE_VALID_RV]: Log when valid RecordValue detected All new logs use glog.Infof() so they appear without requiring -v flags. This will help identify where data corruption occurs in the CI environment. --- weed/mq/broker/broker_grpc_sub.go | 12 +++++++++++ weed/mq/kafka/gateway/test_mock_handler.go | 25 ++++++++++++++++++++++ weed/mq/kafka/protocol/fetch.go | 21 +++++++++++++++++- weed/util/log_buffer/log_read.go | 12 +++++++++++ 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 51a74c6a9..a43315971 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -272,6 +272,18 @@ subscribeLoop: TsNs: logEntry.TsNs, } + // DEBUG: Log broker sending data to client + valuePreview := "" + if len(logEntry.Data) > 0 { + if len(logEntry.Data) <= 50 { + valuePreview = string(logEntry.Data) + } else { + valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data)) + } + } + glog.Infof("[BROKER_SEND] Sending to client=%s valueLen=%d valuePreview=%q offset=%d", + clientName, len(logEntry.Data), valuePreview, logEntry.Offset) + if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ Data: dataMsg, }}); err != nil { diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go index ef0a012ef..bb86bc0bc 100644 --- a/weed/mq/kafka/gateway/test_mock_handler.go +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -121,6 +122,18 @@ func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName stri offset := m.offsets[topicName][partitionID] m.offsets[topicName][partitionID]++ + // DEBUG: Log what's being stored + valuePreview := "" + if len(value) > 0 { + if len(value) <= 50 { + valuePreview = string(value) + } else { + valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(value[:50]), len(value)) + } + } + glog.Infof("[MOCK_STORE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q", + topicName, partitionID, offset, len(value), valuePreview) + // Store record record := &mockRecord{ key: key, @@ -155,6 +168,18 @@ func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic strin result := make([]integration.SMQRecord, 0, maxRecords) for _, record := range partitionRecords { if record.GetOffset() >= fromOffset { + // DEBUG: Log what's being retrieved + valuePreview := "" + if len(record.GetValue()) > 0 { + if len(record.GetValue()) <= 50 { + valuePreview = string(record.GetValue()) + } else { + valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(record.GetValue()[:50]), len(record.GetValue())) + } + } + glog.Infof("[MOCK_RETRIEVE] topic=%s partition=%d offset=%d valueLen=%d valuePreview=%q", + topic, partition, record.GetOffset(), len(record.GetValue()), valuePreview) + result = append(result, record) if len(result) >= maxRecords { break diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 21a1ed8c6..de4999d0a 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -1132,10 +1132,23 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return nil } + // DEBUG: Log what we're decoding + valuePreview := "" + if len(recordValueBytes) > 0 { + if len(recordValueBytes) <= 50 { + valuePreview = string(recordValueBytes) + } else { + valuePreview = fmt.Sprintf("%s...(total %d bytes)", string(recordValueBytes[:50]), len(recordValueBytes)) + } + } + glog.Infof("[DECODE_START] topic=%s schemaEnabled=%v bytesLen=%d preview=%q", + topicName, h.IsSchemaEnabled(), len(recordValueBytes), valuePreview) + // For system topics like _schemas, _consumer_offsets, etc., // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) // and should NOT be processed as RecordValue protobuf messages. if strings.HasPrefix(topicName, "_") { + glog.Infof("[DECODE_SYSTEM_TOPIC] Returning raw bytes for system topic %s", topicName) return recordValueBytes } @@ -1143,6 +1156,7 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB // All messages are stored as raw bytes when schema management is disabled // Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing if !h.IsSchemaEnabled() { + glog.Infof("[DECODE_NO_SCHEMA] Returning raw bytes (schema disabled)") return recordValueBytes } @@ -1159,9 +1173,12 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB // We need to check if this looks like a real RecordValue or just random bytes if !h.isValidRecordValue(recordValue, recordValueBytes) { // Not a valid RecordValue - return raw bytes as-is + glog.Infof("[DECODE_INVALID_RV] Not a valid RecordValue, returning raw bytes") return recordValueBytes } + glog.Infof("[DECODE_VALID_RV] Valid RecordValue detected, re-encoding...") + // If schema management is enabled, re-encode the RecordValue to Confluent format if h.IsSchemaEnabled() { if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil { @@ -1171,7 +1188,9 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB } // Fallback: convert RecordValue to JSON - return h.recordValueToJSON(recordValue) + result := h.recordValueToJSON(recordValue) + glog.Infof("[DECODE_END] Returning JSON converted bytes, len=%d", len(result)) + return result } // isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 77f03ddb8..a10926509 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -355,6 +355,18 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star continue } + // DEBUG: Log unmarshaled entry with data preview + dataPreview := "" + if len(logEntry.Data) > 0 { + if len(logEntry.Data) <= 50 { + dataPreview = string(logEntry.Data) + } else { + dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(logEntry.Data[:50]), len(logEntry.Data)) + } + } + glog.Infof("[LOG_BUFFER_UNMARSHAL] Offset=%d TsNs=%d dataLen=%d dataPreview=%q", + logEntry.Offset, logEntry.TsNs, len(logEntry.Data), dataPreview) + glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key)) // Handle offset-based filtering for offset-based start positions