diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 189b2d283..ba50c8944 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -149,11 +149,11 @@ func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, a // Process the record set recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused if parseErr != nil { - errorCode = 42 // INVALID_RECORD - } else if recordCount > 0 { - // Use SeaweedMQ integration - offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData) - if err != nil { + errorCode = 42 // INVALID_RECORD + } else if recordCount > 0 { + // Use SeaweedMQ integration + offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData) + if err != nil { // Check if this is a schema validation error and add delay to prevent overloading if h.isSchemaValidationError(err) { time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures @@ -1554,7 +1554,7 @@ func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.R if err != nil { return nil, fmt.Errorf("failed to create Avro decoder: %w", err) } - + recordType, err := decoder.InferRecordType() if err != nil { return nil, err @@ -1588,7 +1588,7 @@ func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*sch if err != nil { return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err) } - + recordType, err := decoder.InferRecordType() if err != nil { return nil, err @@ -1621,7 +1621,7 @@ func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.R if err != nil { return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err) } - + recordType, err := decoder.InferRecordType() if err != nil { return nil, err