|
|
@ -1146,6 +1146,13 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB |
|
|
return recordValueBytes |
|
|
return recordValueBytes |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Validate that the unmarshaled RecordValue has valid field data
|
|
|
|
|
|
// Protobuf unmarshal is lenient and can succeed with garbage data for random bytes
|
|
|
|
|
|
// If Fields is nil or empty, this is not a valid RecordValue - return raw bytes
|
|
|
|
|
|
if recordValue.Fields == nil || len(recordValue.Fields) == 0 { |
|
|
|
|
|
return recordValueBytes |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// If schema management is enabled, re-encode the RecordValue to Confluent format
|
|
|
// If schema management is enabled, re-encode the RecordValue to Confluent format
|
|
|
if h.IsSchemaEnabled() { |
|
|
if h.IsSchemaEnabled() { |
|
|
if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil { |
|
|
if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil { |
|
|
|