diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index e8998fc04..21a1ed8c6 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -1139,6 +1139,13 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return recordValueBytes } + // CRITICAL: If schema management is not enabled, we should NEVER try to parse as RecordValue + // All messages are stored as raw bytes when schema management is disabled + // Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing + if !h.IsSchemaEnabled() { + return recordValueBytes + } + // Try to unmarshal as RecordValue recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(recordValueBytes, recordValue); err != nil {