|
|
@ -1139,6 +1139,13 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB |
|
|
|
return recordValueBytes |
|
|
|
} |
|
|
|
|
|
|
|
// CRITICAL: If schema management is not enabled, we should NEVER try to parse as RecordValue
|
|
|
|
// All messages are stored as raw bytes when schema management is disabled
|
|
|
|
// Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing
|
|
|
|
if !h.IsSchemaEnabled() { |
|
|
|
return recordValueBytes |
|
|
|
} |
|
|
|
|
|
|
|
// Try to unmarshal as RecordValue
|
|
|
|
recordValue := &schema_pb.RecordValue{} |
|
|
|
if err := proto.Unmarshal(recordValueBytes, recordValue); err != nil { |
|
|
|