From 5425f647bc4c79bc3d7d04463a10339c28023bc0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 16:06:07 -0700 Subject: [PATCH] Validate that the unmarshaled RecordValue --- weed/mq/kafka/protocol/fetch.go | 38 ++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index a2dbcc8e6..5daf8eeeb 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -7,6 +7,7 @@ import ( "hash/crc32" "strings" "time" + "unicode/utf8" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" @@ -1146,10 +1147,11 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return recordValueBytes } - // Validate that the unmarshaled RecordValue has valid field data + // Validate that the unmarshaled RecordValue is actually a valid RecordValue // Protobuf unmarshal is lenient and can succeed with garbage data for random bytes - // If Fields is nil or empty, this is not a valid RecordValue - return raw bytes - if recordValue.Fields == nil || len(recordValue.Fields) == 0 { + // We need to check if this looks like a real RecordValue or just random bytes + if !h.isValidRecordValue(recordValue) { + // Not a valid RecordValue - return raw bytes as-is return recordValueBytes } @@ -1165,6 +1167,36 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return h.recordValueToJSON(recordValue) } +// isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes +func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue) bool { + // Empty or nil Fields means not a valid RecordValue + if recordValue == nil || recordValue.Fields == nil || len(recordValue.Fields) == 0 { + return false + } + + // Check if field names are valid UTF-8 strings (not binary garbage) + // Real RecordValue messages have proper field names like "name", "age", etc. + // Random bytes parsed as protobuf often create non-UTF8 or very short field names + for fieldName, fieldValue := range recordValue.Fields { + // Field name should be valid UTF-8 + if !utf8.ValidString(fieldName) { + return false + } + + // Field name should have reasonable length (at least 1 char, at most 1000) + if len(fieldName) == 0 || len(fieldName) > 1000 { + return false + } + + // Field value should not be nil + if fieldValue == nil || fieldValue.Kind == nil { + return false + } + } + + return true +} + // encodeRecordValueToConfluentFormat re-encodes a RecordValue back to Confluent format func (h *Handler) encodeRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) { if recordValue == nil {