From 9cd0a29f481454dd623b816c689005a4abb99997 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 19:12:49 -0700 Subject: [PATCH] purge --- weed/mq/kafka/protocol/produce.go | 128 ------------------------------ 1 file changed, 128 deletions(-) diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index cc61fd18a..b272f6c89 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -582,8 +582,6 @@ func decodeVarint(data []byte) (int64, int) { func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { startTime := time.Now() - // DEBUG: Log request details - // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: // - Request: transactional_id field (nullable string) at the beginning @@ -621,8 +619,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // DEBUG: Log acks and timeout - // Remember if this is fire-and-forget mode isFireAndForget := acks == 0 if isFireAndForget { @@ -635,8 +631,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // DEBUG: Log topics count - // If topicsCount is implausible, there might be a parsing issue if topicsCount > 1000 { return nil, fmt.Errorf("Produce v%d request has implausible topics count: %d", apiVersion, topicsCount) @@ -670,14 +664,10 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, topicName := string(requestBody[offset : offset+int(topicNameSize)]) offset += int(topicNameSize) - // DEBUG: Log topic being processed - // Parse partitions count partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // DEBUG: Log partitions count - // Response: topic name (STRING: 2 bytes length + data) response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) response = append(response, []byte(topicName)...) @@ -711,16 +701,12 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, // Check if topic exists; for v2+ do NOT auto-create topicExists := h.seaweedMQHandler.TopicExists(topicName) - // DEBUG: Log topic existence and record set details - if !topicExists { errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION } else { // Process the record set (lenient parsing) recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused - // DEBUG: Log record count and parse error - if parseErr != nil { errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { @@ -728,16 +714,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, // extractAllRecords handles fallback internally for various cases records := h.extractAllRecords(recordSetData) - // DEBUG: Log extracted records count - - if len(records) > 0 { - // DEBUG: Log first record details (especially for Noop with null value) - if len(records[0].Value) > 0 { - } else { - // Log the key bytes in hex for identification - } - } - if len(records) == 0 { errorCode = 42 // INVALID_RECORD } else { @@ -754,8 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, break } - // DEBUG: Log offset received from broker - if idx == 0 { baseOffset = offsetProduced firstOffsetSet = true @@ -765,7 +739,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, _ = firstOffsetSet } } else { - // DEBUG: Critical case - recordCount is 0! // Try to extract anyway - this might be a Noop record records := h.extractAllRecords(recordSetData) if len(records) > 0 { @@ -783,8 +756,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, } } - // DEBUG: Log response that will be sent - // Build correct Produce v2+ response for this partition // Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5] @@ -834,105 +805,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, return response, nil } -// processSchematizedMessage processes a message that may contain schema information -// ctx controls the publish timeout - if client cancels, process operation is cancelled -func (h *Handler) processSchematizedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, messageBytes []byte) error { - // System topics should bypass schema processing entirely - if h.isSystemTopic(topicName) { - return nil // Skip schema processing for system topics - } - - // Only process if schema management is enabled - if !h.IsSchemaEnabled() { - return nil // Skip schema processing - } - - // Check if message is schematized - if !h.schemaManager.IsSchematized(messageBytes) { - return nil // Not schematized, continue with normal processing - } - - // Decode the message - decodedMsg, err := h.schemaManager.DecodeMessage(messageBytes) - if err != nil { - // In permissive mode, we could continue with raw bytes - // In strict mode, we should reject the message - return fmt.Errorf("schema decoding failed: %w", err) - } - - // Store the decoded message using SeaweedMQ - return h.storeDecodedMessage(ctx, topicName, partitionID, originalKey, decodedMsg) -} - -// storeDecodedMessage stores a decoded message using mq.broker integration -// ctx controls the publish timeout - if client cancels, store operation is cancelled -func (h *Handler) storeDecodedMessage(ctx context.Context, topicName string, partitionID int32, originalKey []byte, decodedMsg *schema.DecodedMessage) error { - // Use broker client if available - if h.IsBrokerIntegrationEnabled() { - // Use the original Kafka message key - key := originalKey - if key == nil { - key = []byte{} // Use empty byte slice for null keys - } - - // Publish the decoded RecordValue to mq.broker - err := h.brokerClient.PublishSchematizedMessage(topicName, key, decodedMsg.Envelope.OriginalBytes) - if err != nil { - return fmt.Errorf("failed to publish to mq.broker: %w", err) - } - - return nil - } - - // Use SeaweedMQ integration - if h.seaweedMQHandler != nil { - // Use the original Kafka message key - key := originalKey - if key == nil { - key = []byte{} // Use empty byte slice for null keys - } - // Store the original Confluent Wire Format bytes (magic byte + schema ID + payload) - // NOT just the Avro payload, so we can return them as-is during fetch without re-encoding - value := decodedMsg.Envelope.OriginalBytes - - _, err := h.seaweedMQHandler.ProduceRecord(ctx, topicName, partitionID, key, value) - if err != nil { - return fmt.Errorf("failed to produce to SeaweedMQ: %w", err) - } - - return nil - } - - return fmt.Errorf("no SeaweedMQ handler available") -} - -// extractMessagesFromRecordSet extracts individual messages from a record set with compression support -func (h *Handler) extractMessagesFromRecordSet(recordSetData []byte) ([][]byte, error) { - // Be lenient for tests: accept arbitrary data if length is sufficient - if len(recordSetData) < 10 { - return nil, fmt.Errorf("record set too small: %d bytes", len(recordSetData)) - } - - // For tests, just return the raw data as a single message without deep parsing - return [][]byte{recordSetData}, nil -} - -// validateSchemaCompatibility checks if a message is compatible with existing schema -func (h *Handler) validateSchemaCompatibility(topicName string, messageBytes []byte) error { - if !h.IsSchemaEnabled() { - return nil // No validation if schema management is disabled - } - - // Extract schema information from message - schemaID, messageFormat, err := h.schemaManager.GetSchemaInfo(messageBytes) - if err != nil { - return nil // Not schematized, no validation needed - } - - // Perform comprehensive schema validation - return h.performSchemaValidation(topicName, schemaID, messageFormat, messageBytes) -} - // performSchemaValidation performs comprehensive schema validation for a topic func (h *Handler) performSchemaValidation(topicName string, schemaID uint32, messageFormat schema.Format, messageBytes []byte) error { // 1. Check if topic is configured to require schemas