|
|
@ -582,8 +582,6 @@ func decodeVarint(data []byte) (int64, int) { |
|
|
|
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { |
|
|
|
startTime := time.Now() |
|
|
|
|
|
|
|
// DEBUG: Log request details
|
|
|
|
|
|
|
|
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
|
|
|
|
// In v2+, the main differences are:
|
|
|
|
// - Request: transactional_id field (nullable string) at the beginning
|
|
|
@ -621,8 +619,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
_ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) |
|
|
|
offset += 4 |
|
|
|
|
|
|
|
// DEBUG: Log acks and timeout
|
|
|
|
|
|
|
|
// Remember if this is fire-and-forget mode
|
|
|
|
isFireAndForget := acks == 0 |
|
|
|
if isFireAndForget { |
|
|
@ -635,8 +631,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) |
|
|
|
offset += 4 |
|
|
|
|
|
|
|
// DEBUG: Log topics count
|
|
|
|
|
|
|
|
// If topicsCount is implausible, there might be a parsing issue
|
|
|
|
if topicsCount > 1000 { |
|
|
|
return nil, fmt.Errorf("Produce v%d request has implausible topics count: %d", apiVersion, topicsCount) |
|
|
@ -670,14 +664,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
topicName := string(requestBody[offset : offset+int(topicNameSize)]) |
|
|
|
offset += int(topicNameSize) |
|
|
|
|
|
|
|
// DEBUG: Log topic being processed
|
|
|
|
|
|
|
|
// Parse partitions count
|
|
|
|
partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) |
|
|
|
offset += 4 |
|
|
|
|
|
|
|
// DEBUG: Log partitions count
|
|
|
|
|
|
|
|
// Response: topic name (STRING: 2 bytes length + data)
|
|
|
|
response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) |
|
|
|
response = append(response, []byte(topicName)...) |
|
|
@ -711,16 +701,12 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
// Check if topic exists; for v2+ do NOT auto-create
|
|
|
|
topicExists := h.seaweedMQHandler.TopicExists(topicName) |
|
|
|
|
|
|
|
// DEBUG: Log topic existence and record set details
|
|
|
|
|
|
|
|
if !topicExists { |
|
|
|
errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
|
|
|
|
} else { |
|
|
|
// Process the record set (lenient parsing)
|
|
|
|
recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
|
|
|
|
|
|
|
|
// DEBUG: Log record count and parse error
|
|
|
|
|
|
|
|
if parseErr != nil { |
|
|
|
errorCode = 42 // INVALID_RECORD
|
|
|
|
} else if recordCount > 0 { |
|
|
@ -728,16 +714,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
// extractAllRecords handles fallback internally for various cases
|
|
|
|
records := h.extractAllRecords(recordSetData) |
|
|
|
|
|
|
|
// DEBUG: Log extracted records count
|
|
|
|
|
|
|
|
if len(records) > 0 { |
|
|
|
// DEBUG: Log first record details (especially for Noop with null value)
|
|
|
|
if len(records[0].Value) > 0 { |
|
|
|
} else { |
|
|
|
// Log the key bytes in hex for identification
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if len(records) == 0 { |
|
|
|
errorCode = 42 // INVALID_RECORD
|
|
|
|
} else { |
|
|
@ -754,8 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
break |
|
|
|
} |
|
|
|
|
|
|
|
// DEBUG: Log offset received from broker
|
|
|
|
|
|
|
|
if idx == 0 { |
|
|
|
baseOffset = offsetProduced |
|
|
|
firstOffsetSet = true |
|
|
@ -765,7 +739,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
_ = firstOffsetSet |
|
|
|
} |
|
|
|
} else { |
|
|
|
// DEBUG: Critical case - recordCount is 0!
|
|
|
|
// Try to extract anyway - this might be a Noop record
|
|
|
|
records := h.extractAllRecords(recordSetData) |
|
|
|
if len(records) > 0 { |
|
|
@ -783,8 +756,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// DEBUG: Log response that will be sent
|
|
|
|
|
|
|
|
// Build correct Produce v2+ response for this partition
|
|
|
|
// Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5]
|
|
|
|
|
|
|
@ -834,105 +805,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, |
|
|
|
return response, nil |
|
|
|
} |
|
|
|
|
|
|
|
// processSchematizedMessage processes a message that may contain schema information
|
|
|
|
// ctx controls the publish timeout - if client cancels, process operation is cancelled
|
|
|
|
func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error { |
|
|
|
// System topics should bypass schema processing entirely
|
|
|
|
if h.isSystemTopic(topicName) { |
|
|
|
return nil // Skip schema processing for system topics
|
|
|
|
} |
|
|
|
|
|
|
|
// Only process if schema management is enabled
|
|
|
|
if !h.IsSchemaEnabled() { |
|
|
|
return nil // Skip schema processing
|
|
|
|
} |
|
|
|
|
|
|
|
// Check if message is schematized
|
|
|
|
if !h.schemaManager.IsSchematized(messageBytes) { |
|
|
|
return nil // Not schematized, continue with normal processing
|
|
|
|
} |
|
|
|
|
|
|
|
// Decode the message
|
|
|
|
decodedMsg, err := h.schemaManager.DecodeMessage(messageBytes) |
|
|
|
if err != nil { |
|
|
|
// In permissive mode, we could continue with raw bytes
|
|
|
|
// In strict mode, we should reject the message
|
|
|
|
return fmt.Errorf("schema decoding failed: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
// Store the decoded message using SeaweedMQ
|
|
|
|
return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg) |
|
|
|
} |
|
|
|
|
|
|
|
// storeDecodedMessage stores a decoded message using mq.broker integration
|
|
|
|
// ctx controls the publish timeout - if client cancels, store operation is cancelled
|
|
|
|
func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error { |
|
|
|
// Use broker client if available
|
|
|
|
if h.IsBrokerIntegrationEnabled() { |
|
|
|
// Use the original Kafka message key
|
|
|
|
key := originalKey |
|
|
|
if key == nil { |
|
|
|
key = []byte{} // Use empty byte slice for null keys
|
|
|
|
} |
|
|
|
|
|
|
|
// Publish the decoded RecordValue to mq.broker
|
|
|
|
err := h.brokerClient.PublishSchematizedMessage(topicName, key, decodedMsg.Envelope.OriginalBytes) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to publish to mq.broker: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Use SeaweedMQ integration
|
|
|
|
if h.seaweedMQHandler != nil { |
|
|
|
// Use the original Kafka message key
|
|
|
|
key := originalKey |
|
|
|
if key == nil { |
|
|
|
key = []byte{} // Use empty byte slice for null keys
|
|
|
|
} |
|
|
|
// Store the original Confluent Wire Format bytes (magic byte + schema ID + payload)
|
|
|
|
// NOT just the Avro payload, so we can return them as-is during fetch without re-encoding
|
|
|
|
value := decodedMsg.Envelope.OriginalBytes |
|
|
|
|
|
|
|
_, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("failed to produce to SeaweedMQ: %w", err) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
return fmt.Errorf("no SeaweedMQ handler available") |
|
|
|
} |
|
|
|
|
|
|
|
// extractMessagesFromRecordSet extracts individual messages from a record set with compression support
|
|
|
|
func (h *Handler) extractMessagesFromRecordSet(recordSetData []byte) ([][]byte, error) { |
|
|
|
// Be lenient for tests: accept arbitrary data if length is sufficient
|
|
|
|
if len(recordSetData) < 10 { |
|
|
|
return nil, fmt.Errorf("record set too small: %d bytes", len(recordSetData)) |
|
|
|
} |
|
|
|
|
|
|
|
// For tests, just return the raw data as a single message without deep parsing
|
|
|
|
return [][]byte{recordSetData}, nil |
|
|
|
} |
|
|
|
|
|
|
|
// validateSchemaCompatibility checks if a message is compatible with existing schema
|
|
|
|
func (h *Handler) validateSchemaCompatibility(topicName string, messageBytes []byte) error { |
|
|
|
if !h.IsSchemaEnabled() { |
|
|
|
return nil // No validation if schema management is disabled
|
|
|
|
} |
|
|
|
|
|
|
|
// Extract schema information from message
|
|
|
|
schemaID, messageFormat, err := h.schemaManager.GetSchemaInfo(messageBytes) |
|
|
|
if err != nil { |
|
|
|
return nil // Not schematized, no validation needed
|
|
|
|
} |
|
|
|
|
|
|
|
// Perform comprehensive schema validation
|
|
|
|
return h.performSchemaValidation(topicName, schemaID, messageFormat, messageBytes) |
|
|
|
} |
|
|
|
|
|
|
|
// performSchemaValidation performs comprehensive schema validation for a topic
|
|
|
|
func (h *Handler) performSchemaValidation(topicName string, schemaID uint32, messageFormat schema.Format, messageBytes []byte) error { |
|
|
|
// 1. Check if topic is configured to require schemas
|
|
|
|