From 3025b6abfc41e5df174b42305163ff2a286fa932 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 16:40:53 -0700 Subject: [PATCH] skip if If schema management is not enabled --- weed/mq/kafka/protocol/fetch.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index e8998fc04..21a1ed8c6 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -1139,6 +1139,13 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return recordValueBytes } + // CRITICAL: If schema management is not enabled, we should NEVER try to parse as RecordValue + // All messages are stored as raw bytes when schema management is disabled + // Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing + if !h.IsSchemaEnabled() { + return recordValueBytes + } + // Try to unmarshal as RecordValue recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(recordValueBytes, recordValue); err != nil {