You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1546 lines
						
					
					
						
							50 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							1546 lines
						
					
					
						
							50 KiB
						
					
					
				
								package protocol
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"encoding/binary"
							 | 
						|
									"fmt"
							 | 
						|
									"strings"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
							 | 
						|
								
							 | 
						|
									// Version-specific handling
							 | 
						|
									switch apiVersion {
							 | 
						|
									case 0, 1:
							 | 
						|
										return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
							 | 
						|
									case 2, 3, 4, 5, 6, 7:
							 | 
						|
										return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
							 | 
						|
									default:
							 | 
						|
										return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
							 | 
						|
									// Parse Produce v0/v1 request
							 | 
						|
									// Request format: client_id + acks(2) + timeout(4) + topics_array
							 | 
						|
								
							 | 
						|
									if len(requestBody) < 8 { // client_id_size(2) + acks(2) + timeout(4)
							 | 
						|
										return nil, fmt.Errorf("Produce request too short")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Skip client_id
							 | 
						|
									clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
							 | 
						|
								
							 | 
						|
									if len(requestBody) < 2+int(clientIDSize) {
							 | 
						|
										return nil, fmt.Errorf("Produce request client_id too short")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									_ = string(requestBody[2 : 2+int(clientIDSize)]) // clientID
							 | 
						|
									offset := 2 + int(clientIDSize)
							 | 
						|
								
							 | 
						|
									if len(requestBody) < offset+10 { // acks(2) + timeout(4) + topics_count(4)
							 | 
						|
										return nil, fmt.Errorf("Produce request missing data")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse acks and timeout
							 | 
						|
									_ = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) // acks
							 | 
						|
									offset += 2
							 | 
						|
								
							 | 
						|
									topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
									offset += 4
							 | 
						|
								
							 | 
						|
									response := make([]byte, 0, 1024)
							 | 
						|
								
							 | 
						|
									// NOTE: Correlation ID is handled by writeResponseWithHeader
							 | 
						|
									// Do NOT include it in the response body
							 | 
						|
								
							 | 
						|
									// Topics count (same as request)
							 | 
						|
									topicsCountBytes := make([]byte, 4)
							 | 
						|
									binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
							 | 
						|
									response = append(response, topicsCountBytes...)
							 | 
						|
								
							 | 
						|
									// Process each topic
							 | 
						|
									for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
							 | 
						|
										if len(requestBody) < offset+2 {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Parse topic name
							 | 
						|
										topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
							 | 
						|
										offset += 2
							 | 
						|
								
							 | 
						|
										if len(requestBody) < offset+int(topicNameSize)+4 {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										topicName := string(requestBody[offset : offset+int(topicNameSize)])
							 | 
						|
										offset += int(topicNameSize)
							 | 
						|
								
							 | 
						|
										// Parse partitions count
							 | 
						|
										partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
										offset += 4
							 | 
						|
								
							 | 
						|
										// Check if topic exists, auto-create if it doesn't (simulates auto.create.topics.enable=true)
							 | 
						|
										topicExists := h.seaweedMQHandler.TopicExists(topicName)
							 | 
						|
								
							 | 
						|
										_ = h.seaweedMQHandler.ListTopics() // existingTopics
							 | 
						|
										if !topicExists {
							 | 
						|
											// Use schema-aware topic creation for auto-created topics with configurable default partitions
							 | 
						|
											defaultPartitions := h.GetDefaultPartitions()
							 | 
						|
											glog.V(1).Infof("[PRODUCE] Topic %s does not exist, auto-creating with %d partitions", topicName, defaultPartitions)
							 | 
						|
											if err := h.createTopicWithSchemaSupport(topicName, defaultPartitions); err != nil {
							 | 
						|
												glog.V(0).Infof("[PRODUCE] ERROR: Failed to auto-create topic %s: %v", topicName, err)
							 | 
						|
											} else {
							 | 
						|
												glog.V(1).Infof("[PRODUCE] Successfully auto-created topic %s", topicName)
							 | 
						|
												// Invalidate cache immediately after creation so consumers can find it
							 | 
						|
												h.seaweedMQHandler.InvalidateTopicExistsCache(topicName)
							 | 
						|
												topicExists = true
							 | 
						|
											}
							 | 
						|
										} else {
							 | 
						|
											glog.V(2).Infof("[PRODUCE] Topic %s already exists", topicName)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Response: topic_name_size(2) + topic_name + partitions_array
							 | 
						|
										response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
							 | 
						|
										response = append(response, []byte(topicName)...)
							 | 
						|
								
							 | 
						|
										partitionsCountBytes := make([]byte, 4)
							 | 
						|
										binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount)
							 | 
						|
										response = append(response, partitionsCountBytes...)
							 | 
						|
								
							 | 
						|
										// Process each partition
							 | 
						|
										for j := uint32(0); j < partitionsCount && offset < len(requestBody); j++ {
							 | 
						|
											if len(requestBody) < offset+8 {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Parse partition: partition_id(4) + record_set_size(4) + record_set
							 | 
						|
											partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
											offset += 4
							 | 
						|
								
							 | 
						|
											recordSetSize := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
											offset += 4
							 | 
						|
								
							 | 
						|
											if len(requestBody) < offset+int(recordSetSize) {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// CRITICAL FIX: Make a copy of recordSetData to prevent buffer sharing corruption
							 | 
						|
											// The slice requestBody[offset:offset+int(recordSetSize)] shares the underlying array
							 | 
						|
											// with the request buffer, which can be reused and cause data corruption
							 | 
						|
											recordSetData := make([]byte, recordSetSize)
							 | 
						|
											copy(recordSetData, requestBody[offset:offset+int(recordSetSize)])
							 | 
						|
											offset += int(recordSetSize)
							 | 
						|
								
							 | 
						|
											// Response: partition_id(4) + error_code(2) + base_offset(8) + log_append_time(8) + log_start_offset(8)
							 | 
						|
											partitionIDBytes := make([]byte, 4)
							 | 
						|
											binary.BigEndian.PutUint32(partitionIDBytes, partitionID)
							 | 
						|
											response = append(response, partitionIDBytes...)
							 | 
						|
								
							 | 
						|
											var errorCode uint16 = 0
							 | 
						|
											var baseOffset int64 = 0
							 | 
						|
											currentTime := time.Now().UnixNano()
							 | 
						|
								
							 | 
						|
											if !topicExists {
							 | 
						|
												errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
							 | 
						|
											} else {
							 | 
						|
												// Process the record set
							 | 
						|
												recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
							 | 
						|
												if parseErr != nil {
							 | 
						|
													errorCode = 42 // INVALID_RECORD
							 | 
						|
												} else if recordCount > 0 {
							 | 
						|
													// Use SeaweedMQ integration
							 | 
						|
													offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData)
							 | 
						|
													if err != nil {
							 | 
						|
														// Check if this is a schema validation error and add delay to prevent overloading
							 | 
						|
														if h.isSchemaValidationError(err) {
							 | 
						|
															time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
							 | 
						|
														}
							 | 
						|
														errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
							 | 
						|
													} else {
							 | 
						|
														baseOffset = offset
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Error code
							 | 
						|
											response = append(response, byte(errorCode>>8), byte(errorCode))
							 | 
						|
								
							 | 
						|
											// Base offset (8 bytes)
							 | 
						|
											baseOffsetBytes := make([]byte, 8)
							 | 
						|
											binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
							 | 
						|
											response = append(response, baseOffsetBytes...)
							 | 
						|
								
							 | 
						|
											// Log append time (8 bytes) - timestamp when appended
							 | 
						|
											logAppendTimeBytes := make([]byte, 8)
							 | 
						|
											binary.BigEndian.PutUint64(logAppendTimeBytes, uint64(currentTime))
							 | 
						|
											response = append(response, logAppendTimeBytes...)
							 | 
						|
								
							 | 
						|
											// Log start offset (8 bytes) - same as base for now
							 | 
						|
											logStartOffsetBytes := make([]byte, 8)
							 | 
						|
											binary.BigEndian.PutUint64(logStartOffsetBytes, uint64(baseOffset))
							 | 
						|
											response = append(response, logStartOffsetBytes...)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add throttle time at the end (4 bytes)
							 | 
						|
									response = append(response, 0, 0, 0, 0)
							 | 
						|
								
							 | 
						|
									// Even for acks=0, kafka-go expects a minimal response structure
							 | 
						|
									return response, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// parseRecordSet parses a Kafka record set using the enhanced record batch parser
							 | 
						|
								// Now supports:
							 | 
						|
								// - Proper record batch format parsing (v2)
							 | 
						|
								// - Compression support (gzip, snappy, lz4, zstd)
							 | 
						|
								// - CRC32 validation
							 | 
						|
								// - Individual record extraction
							 | 
						|
								func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, totalSize int32, err error) {
							 | 
						|
								
							 | 
						|
									// Heuristic: permit short inputs for tests
							 | 
						|
									if len(recordSetData) < 61 {
							 | 
						|
										// If very small, decide error vs fallback
							 | 
						|
										if len(recordSetData) < 8 {
							 | 
						|
											return 0, 0, fmt.Errorf("failed to parse record batch: record set too small: %d bytes", len(recordSetData))
							 | 
						|
										}
							 | 
						|
										// If we have at least 20 bytes, attempt to read a count at [16:20]
							 | 
						|
										if len(recordSetData) >= 20 {
							 | 
						|
											cnt := int32(binary.BigEndian.Uint32(recordSetData[16:20]))
							 | 
						|
											if cnt <= 0 || cnt > 1000000 {
							 | 
						|
												cnt = 1
							 | 
						|
											}
							 | 
						|
											return cnt, int32(len(recordSetData)), nil
							 | 
						|
										}
							 | 
						|
										// Otherwise default to 1 record
							 | 
						|
										return 1, int32(len(recordSetData)), nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									parser := NewRecordBatchParser()
							 | 
						|
								
							 | 
						|
									// Parse the record batch with CRC validation
							 | 
						|
									batch, err := parser.ParseRecordBatchWithValidation(recordSetData, true)
							 | 
						|
									if err != nil {
							 | 
						|
										// If CRC validation fails, try without validation for backward compatibility
							 | 
						|
										batch, err = parser.ParseRecordBatch(recordSetData)
							 | 
						|
										if err != nil {
							 | 
						|
											return 0, 0, fmt.Errorf("failed to parse record batch: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return batch.RecordCount, int32(len(recordSetData)), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
							 | 
						|
								// ctx controls the publish timeout - if client cancels, produce operation is cancelled
							 | 
						|
								func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) {
							 | 
						|
									// Extract all records from the record set and publish each one
							 | 
						|
									// extractAllRecords handles fallback internally for various cases
							 | 
						|
									records := h.extractAllRecords(recordSetData)
							 | 
						|
								
							 | 
						|
									if len(records) == 0 {
							 | 
						|
										return 0, fmt.Errorf("failed to parse Kafka record set: no records extracted")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Publish all records and return the offset of the first record (base offset)
							 | 
						|
									var baseOffset int64
							 | 
						|
									for idx, kv := range records {
							 | 
						|
										offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value)
							 | 
						|
										if err != nil {
							 | 
						|
											return 0, err
							 | 
						|
										}
							 | 
						|
										if idx == 0 {
							 | 
						|
											baseOffset = offsetProduced
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return baseOffset, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// extractAllRecords parses a Kafka record batch and returns all records' key/value pairs
							 | 
						|
								func (h *Handler) extractAllRecords(recordSetData []byte) []struct{ Key, Value []byte } {
							 | 
						|
									results := make([]struct{ Key, Value []byte }, 0, 8)
							 | 
						|
								
							 | 
						|
									if len(recordSetData) > 0 {
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if len(recordSetData) < 61 {
							 | 
						|
										// Too small to be a full batch; treat as single opaque record
							 | 
						|
										key, value := h.extractFirstRecord(recordSetData)
							 | 
						|
										// Always include records, even if both key and value are null
							 | 
						|
										// Schema Registry Noop records may have null values
							 | 
						|
										results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
							 | 
						|
										return results
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse record batch header (Kafka v2)
							 | 
						|
									offset := 0
							 | 
						|
									_ = int64(binary.BigEndian.Uint64(recordSetData[offset:])) // baseOffset
							 | 
						|
									offset += 8                                                // base_offset
							 | 
						|
									_ = binary.BigEndian.Uint32(recordSetData[offset:])        // batchLength
							 | 
						|
									offset += 4                                                // batch_length
							 | 
						|
									_ = binary.BigEndian.Uint32(recordSetData[offset:])        // partitionLeaderEpoch
							 | 
						|
									offset += 4                                                // partition_leader_epoch
							 | 
						|
								
							 | 
						|
									if offset >= len(recordSetData) {
							 | 
						|
										return results
							 | 
						|
									}
							 | 
						|
									magic := recordSetData[offset] // magic
							 | 
						|
									offset += 1
							 | 
						|
								
							 | 
						|
									if magic != 2 {
							 | 
						|
										// Unsupported, fallback
							 | 
						|
										key, value := h.extractFirstRecord(recordSetData)
							 | 
						|
										// Always include records, even if both key and value are null
							 | 
						|
										results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
							 | 
						|
										return results
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Skip CRC, read attributes to check compression
							 | 
						|
									offset += 4 // crc
							 | 
						|
									attributes := binary.BigEndian.Uint16(recordSetData[offset:])
							 | 
						|
									offset += 2 // attributes
							 | 
						|
								
							 | 
						|
									// Check compression codec from attributes (bits 0-2)
							 | 
						|
									compressionCodec := compression.CompressionCodec(attributes & 0x07)
							 | 
						|
								
							 | 
						|
									offset += 4 // last_offset_delta
							 | 
						|
									offset += 8 // first_timestamp
							 | 
						|
									offset += 8 // max_timestamp
							 | 
						|
									offset += 8 // producer_id
							 | 
						|
									offset += 2 // producer_epoch
							 | 
						|
									offset += 4 // base_sequence
							 | 
						|
								
							 | 
						|
									// records_count
							 | 
						|
									if offset+4 > len(recordSetData) {
							 | 
						|
										return results
							 | 
						|
									}
							 | 
						|
									recordsCount := int(binary.BigEndian.Uint32(recordSetData[offset:]))
							 | 
						|
									offset += 4
							 | 
						|
								
							 | 
						|
									// Extract and decompress the records section
							 | 
						|
									recordsData := recordSetData[offset:]
							 | 
						|
									if compressionCodec != compression.None {
							 | 
						|
										decompressed, err := compression.Decompress(compressionCodec, recordsData)
							 | 
						|
										if err != nil {
							 | 
						|
											// Fallback to extractFirstRecord
							 | 
						|
											key, value := h.extractFirstRecord(recordSetData)
							 | 
						|
											results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
							 | 
						|
											return results
							 | 
						|
										}
							 | 
						|
										recordsData = decompressed
							 | 
						|
									}
							 | 
						|
									// Reset offset to start of records data (whether compressed or not)
							 | 
						|
									offset = 0
							 | 
						|
								
							 | 
						|
									if len(recordsData) > 0 {
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Iterate records
							 | 
						|
									for i := 0; i < recordsCount && offset < len(recordsData); i++ {
							 | 
						|
										// record_length is a SIGNED zigzag-encoded varint (like all varints in Kafka record format)
							 | 
						|
										recLen, n := decodeVarint(recordsData[offset:])
							 | 
						|
										if n == 0 || recLen <= 0 {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										offset += n
							 | 
						|
										if offset+int(recLen) > len(recordsData) {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										rec := recordsData[offset : offset+int(recLen)]
							 | 
						|
										offset += int(recLen)
							 | 
						|
								
							 | 
						|
										// Parse record fields
							 | 
						|
										rpos := 0
							 | 
						|
										if rpos >= len(rec) {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										rpos += 1 // attributes
							 | 
						|
								
							 | 
						|
										// timestamp_delta (varint)
							 | 
						|
										var nBytes int
							 | 
						|
										_, nBytes = decodeVarint(rec[rpos:])
							 | 
						|
										if nBytes == 0 {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										rpos += nBytes
							 | 
						|
										// offset_delta (varint)
							 | 
						|
										_, nBytes = decodeVarint(rec[rpos:])
							 | 
						|
										if nBytes == 0 {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										rpos += nBytes
							 | 
						|
								
							 | 
						|
										// key
							 | 
						|
										keyLen, nBytes := decodeVarint(rec[rpos:])
							 | 
						|
										if nBytes == 0 {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										rpos += nBytes
							 | 
						|
										var key []byte
							 | 
						|
										if keyLen >= 0 {
							 | 
						|
											if rpos+int(keyLen) > len(rec) {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
											key = rec[rpos : rpos+int(keyLen)]
							 | 
						|
											rpos += int(keyLen)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// value
							 | 
						|
										valLen, nBytes := decodeVarint(rec[rpos:])
							 | 
						|
										if nBytes == 0 {
							 | 
						|
											continue
							 | 
						|
										}
							 | 
						|
										rpos += nBytes
							 | 
						|
										var value []byte
							 | 
						|
										if valLen >= 0 {
							 | 
						|
											if rpos+int(valLen) > len(rec) {
							 | 
						|
												continue
							 | 
						|
											}
							 | 
						|
											value = rec[rpos : rpos+int(valLen)]
							 | 
						|
											rpos += int(valLen)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// headers (varint) - skip
							 | 
						|
										_, n = decodeVarint(rec[rpos:])
							 | 
						|
										if n == 0 { /* ignore */
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// DO NOT normalize nils to empty slices - Kafka distinguishes null vs empty
							 | 
						|
										// Keep nil as nil, empty as empty
							 | 
						|
								
							 | 
						|
										results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return results
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// extractFirstRecord extracts the first record from a Kafka record batch
							 | 
						|
								func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) {
							 | 
						|
								
							 | 
						|
									if len(recordSetData) < 61 {
							 | 
						|
										// Record set too small to contain a valid Kafka v2 batch
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									offset := 0
							 | 
						|
								
							 | 
						|
									// Parse record batch header (Kafka v2 format)
							 | 
						|
									// base_offset(8) + batch_length(4) + partition_leader_epoch(4) + magic(1) + crc(4) + attributes(2)
							 | 
						|
									// + last_offset_delta(4) + first_timestamp(8) + max_timestamp(8) + producer_id(8) + producer_epoch(2)
							 | 
						|
									// + base_sequence(4) + records_count(4) = 61 bytes header
							 | 
						|
								
							 | 
						|
									offset += 8                                                // skip base_offset
							 | 
						|
									_ = int32(binary.BigEndian.Uint32(recordSetData[offset:])) // batchLength unused
							 | 
						|
									offset += 4                                                // batch_length
							 | 
						|
								
							 | 
						|
									offset += 4 // skip partition_leader_epoch
							 | 
						|
									magic := recordSetData[offset]
							 | 
						|
									offset += 1 // magic byte
							 | 
						|
								
							 | 
						|
									if magic != 2 {
							 | 
						|
										// Unsupported magic byte - only Kafka v2 format is supported
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									offset += 4 // skip crc
							 | 
						|
									offset += 2 // skip attributes
							 | 
						|
									offset += 4 // skip last_offset_delta
							 | 
						|
									offset += 8 // skip first_timestamp
							 | 
						|
									offset += 8 // skip max_timestamp
							 | 
						|
									offset += 8 // skip producer_id
							 | 
						|
									offset += 2 // skip producer_epoch
							 | 
						|
									offset += 4 // skip base_sequence
							 | 
						|
								
							 | 
						|
									recordsCount := int32(binary.BigEndian.Uint32(recordSetData[offset:]))
							 | 
						|
									offset += 4 // records_count
							 | 
						|
								
							 | 
						|
									if recordsCount == 0 {
							 | 
						|
										// No records in batch
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse first record
							 | 
						|
									if offset >= len(recordSetData) {
							 | 
						|
										// Not enough data to parse record
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Read record length (unsigned varint)
							 | 
						|
									recordLengthU32, varintLen, err := DecodeUvarint(recordSetData[offset:])
							 | 
						|
									if err != nil || varintLen == 0 {
							 | 
						|
										// Invalid varint encoding
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
									recordLength := int64(recordLengthU32)
							 | 
						|
									offset += varintLen
							 | 
						|
								
							 | 
						|
									if offset+int(recordLength) > len(recordSetData) {
							 | 
						|
										// Record length exceeds available data
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									recordData := recordSetData[offset : offset+int(recordLength)]
							 | 
						|
									recordOffset := 0
							 | 
						|
								
							 | 
						|
									// Parse record: attributes(1) + timestamp_delta(varint) + offset_delta(varint) + key + value + headers
							 | 
						|
									recordOffset += 1 // skip attributes
							 | 
						|
								
							 | 
						|
									// Skip timestamp_delta (varint)
							 | 
						|
									_, varintLen = decodeVarint(recordData[recordOffset:])
							 | 
						|
									if varintLen == 0 {
							 | 
						|
										// Invalid timestamp_delta varint
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
									recordOffset += varintLen
							 | 
						|
								
							 | 
						|
									// Skip offset_delta (varint)
							 | 
						|
									_, varintLen = decodeVarint(recordData[recordOffset:])
							 | 
						|
									if varintLen == 0 {
							 | 
						|
										// Invalid offset_delta varint
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
									recordOffset += varintLen
							 | 
						|
								
							 | 
						|
									// Read key length and key
							 | 
						|
									keyLength, varintLen := decodeVarint(recordData[recordOffset:])
							 | 
						|
									if varintLen == 0 {
							 | 
						|
										// Invalid key length varint
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
									recordOffset += varintLen
							 | 
						|
								
							 | 
						|
									var key []byte
							 | 
						|
									if keyLength == -1 {
							 | 
						|
										key = nil // null key
							 | 
						|
									} else if keyLength == 0 {
							 | 
						|
										key = []byte{} // empty key
							 | 
						|
									} else {
							 | 
						|
										if recordOffset+int(keyLength) > len(recordData) {
							 | 
						|
											// Key length exceeds available data
							 | 
						|
											return nil, nil
							 | 
						|
										}
							 | 
						|
										key = recordData[recordOffset : recordOffset+int(keyLength)]
							 | 
						|
										recordOffset += int(keyLength)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Read value length and value
							 | 
						|
									valueLength, varintLen := decodeVarint(recordData[recordOffset:])
							 | 
						|
									if varintLen == 0 {
							 | 
						|
										// Invalid value length varint
							 | 
						|
										return nil, nil
							 | 
						|
									}
							 | 
						|
									recordOffset += varintLen
							 | 
						|
								
							 | 
						|
									var value []byte
							 | 
						|
									if valueLength == -1 {
							 | 
						|
										value = nil // null value
							 | 
						|
									} else if valueLength == 0 {
							 | 
						|
										value = []byte{} // empty value
							 | 
						|
									} else {
							 | 
						|
										if recordOffset+int(valueLength) > len(recordData) {
							 | 
						|
											// Value length exceeds available data
							 | 
						|
											return nil, nil
							 | 
						|
										}
							 | 
						|
										value = recordData[recordOffset : recordOffset+int(valueLength)]
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Preserve null semantics - don't convert null to empty
							 | 
						|
									// Schema Registry Noop records specifically use null values
							 | 
						|
									return key, value
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// decodeVarint decodes a variable-length integer from bytes using zigzag encoding
							 | 
						|
								// Returns the decoded value and the number of bytes consumed
							 | 
						|
								func decodeVarint(data []byte) (int64, int) {
							 | 
						|
									if len(data) == 0 {
							 | 
						|
										return 0, 0
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var result int64
							 | 
						|
									var shift uint
							 | 
						|
									var bytesRead int
							 | 
						|
								
							 | 
						|
									for i, b := range data {
							 | 
						|
										if i > 9 { // varints can be at most 10 bytes
							 | 
						|
											return 0, 0 // invalid varint
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										bytesRead++
							 | 
						|
										result |= int64(b&0x7F) << shift
							 | 
						|
								
							 | 
						|
										if (b & 0x80) == 0 {
							 | 
						|
											// Most significant bit is 0, we're done
							 | 
						|
											// Apply zigzag decoding for signed integers
							 | 
						|
											return (result >> 1) ^ (-(result & 1)), bytesRead
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										shift += 7
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return 0, 0 // incomplete varint
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
							 | 
						|
								func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
							 | 
						|
								
							 | 
						|
									// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
							 | 
						|
									// In v2+, the main differences are:
							 | 
						|
									// - Request: transactional_id field (nullable string) at the beginning
							 | 
						|
									// - Response: throttle_time_ms field at the end (v1+)
							 | 
						|
								
							 | 
						|
									// Parse Produce v2+ request format (client_id already stripped in HandleConn)
							 | 
						|
									// v2: acks(INT16) + timeout_ms(INT32) + topics(ARRAY)
							 | 
						|
									// v3+: transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY)
							 | 
						|
								
							 | 
						|
									offset := 0
							 | 
						|
								
							 | 
						|
									// transactional_id only exists in v3+
							 | 
						|
									if apiVersion >= 3 {
							 | 
						|
										if len(requestBody) < offset+2 {
							 | 
						|
											return nil, fmt.Errorf("Produce v%d request too short for transactional_id", apiVersion)
							 | 
						|
										}
							 | 
						|
										txIDLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
							 | 
						|
										offset += 2
							 | 
						|
										if txIDLen >= 0 {
							 | 
						|
											if len(requestBody) < offset+int(txIDLen) {
							 | 
						|
												return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion)
							 | 
						|
											}
							 | 
						|
											_ = string(requestBody[offset : offset+int(txIDLen)])
							 | 
						|
											offset += int(txIDLen)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse acks (INT16) and timeout_ms (INT32)
							 | 
						|
									if len(requestBody) < offset+6 {
							 | 
						|
										return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
							 | 
						|
									offset += 2
							 | 
						|
									_ = binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
									offset += 4
							 | 
						|
								
							 | 
						|
									// Remember if this is fire-and-forget mode
							 | 
						|
									isFireAndForget := acks == 0
							 | 
						|
									if isFireAndForget {
							 | 
						|
									} else {
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if len(requestBody) < offset+4 {
							 | 
						|
										return nil, fmt.Errorf("Produce v%d request missing topics count", apiVersion)
							 | 
						|
									}
							 | 
						|
									topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
									offset += 4
							 | 
						|
								
							 | 
						|
									// If topicsCount is implausible, there might be a parsing issue
							 | 
						|
									if topicsCount > 1000 {
							 | 
						|
										return nil, fmt.Errorf("Produce v%d request has implausible topics count: %d", apiVersion, topicsCount)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Build response
							 | 
						|
									response := make([]byte, 0, 256)
							 | 
						|
								
							 | 
						|
									// NOTE: Correlation ID is handled by writeResponseWithHeader
							 | 
						|
									// Do NOT include it in the response body
							 | 
						|
								
							 | 
						|
									// Topics array length (first field in response body)
							 | 
						|
									topicsCountBytes := make([]byte, 4)
							 | 
						|
									binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
							 | 
						|
									response = append(response, topicsCountBytes...)
							 | 
						|
								
							 | 
						|
									// Process each topic with correct parsing and response format
							 | 
						|
									for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
							 | 
						|
										// Parse topic name
							 | 
						|
										if len(requestBody) < offset+2 {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
							 | 
						|
										offset += 2
							 | 
						|
								
							 | 
						|
										if len(requestBody) < offset+int(topicNameSize)+4 {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										topicName := string(requestBody[offset : offset+int(topicNameSize)])
							 | 
						|
										offset += int(topicNameSize)
							 | 
						|
								
							 | 
						|
										// Parse partitions count
							 | 
						|
										partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
										offset += 4
							 | 
						|
								
							 | 
						|
										// Response: topic name (STRING: 2 bytes length + data)
							 | 
						|
										response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
							 | 
						|
										response = append(response, []byte(topicName)...)
							 | 
						|
								
							 | 
						|
										// Response: partitions count (4 bytes)
							 | 
						|
										partitionsCountBytes := make([]byte, 4)
							 | 
						|
										binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount)
							 | 
						|
										response = append(response, partitionsCountBytes...)
							 | 
						|
								
							 | 
						|
										// Process each partition with correct parsing
							 | 
						|
										for j := uint32(0); j < partitionsCount && offset < len(requestBody); j++ {
							 | 
						|
											// Parse partition request: partition_id(4) + record_set_size(4) + record_set_data
							 | 
						|
											if len(requestBody) < offset+8 {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
											offset += 4
							 | 
						|
											recordSetSize := binary.BigEndian.Uint32(requestBody[offset : offset+4])
							 | 
						|
											offset += 4
							 | 
						|
											if len(requestBody) < offset+int(recordSetSize) {
							 | 
						|
												break
							 | 
						|
											}
							 | 
						|
											// CRITICAL FIX: Make a copy of recordSetData to prevent buffer sharing corruption
							 | 
						|
											// The slice requestBody[offset:offset+int(recordSetSize)] shares the underlying array
							 | 
						|
											// with the request buffer, which can be reused and cause data corruption
							 | 
						|
											recordSetData := make([]byte, recordSetSize)
							 | 
						|
											copy(recordSetData, requestBody[offset:offset+int(recordSetSize)])
							 | 
						|
											offset += int(recordSetSize)
							 | 
						|
								
							 | 
						|
											// Process the record set and store in ledger
							 | 
						|
											var errorCode uint16 = 0
							 | 
						|
											var baseOffset int64 = 0
							 | 
						|
											currentTime := time.Now().UnixNano()
							 | 
						|
								
							 | 
						|
											// Check if topic exists; for v2+ do NOT auto-create
							 | 
						|
											topicExists := h.seaweedMQHandler.TopicExists(topicName)
							 | 
						|
								
							 | 
						|
											if !topicExists {
							 | 
						|
												errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
							 | 
						|
											} else {
							 | 
						|
												// Process the record set (lenient parsing)
							 | 
						|
												recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
							 | 
						|
								
							 | 
						|
												if parseErr != nil {
							 | 
						|
													errorCode = 42 // INVALID_RECORD
							 | 
						|
												} else if recordCount > 0 {
							 | 
						|
													// Extract all records from the record set and publish each one
							 | 
						|
													// extractAllRecords handles fallback internally for various cases
							 | 
						|
													records := h.extractAllRecords(recordSetData)
							 | 
						|
								
							 | 
						|
													if len(records) == 0 {
							 | 
						|
														errorCode = 42 // INVALID_RECORD
							 | 
						|
													} else {
							 | 
						|
														for idx, kv := range records {
							 | 
						|
															offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
							 | 
						|
								
							 | 
						|
															if prodErr != nil {
							 | 
						|
																// Check if this is a schema validation error and add delay to prevent overloading
							 | 
						|
																if h.isSchemaValidationError(prodErr) {
							 | 
						|
																	time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
							 | 
						|
																}
							 | 
						|
																errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
							 | 
						|
																break
							 | 
						|
															}
							 | 
						|
								
							 | 
						|
															if idx == 0 {
							 | 
						|
																baseOffset = offsetProduced
							 | 
						|
															}
							 | 
						|
														}
							 | 
						|
													}
							 | 
						|
												} else {
							 | 
						|
													// Try to extract anyway - this might be a Noop record
							 | 
						|
													records := h.extractAllRecords(recordSetData)
							 | 
						|
													if len(records) > 0 {
							 | 
						|
														for idx, kv := range records {
							 | 
						|
															offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
							 | 
						|
															if prodErr != nil {
							 | 
						|
																errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
							 | 
						|
																break
							 | 
						|
															}
							 | 
						|
															if idx == 0 {
							 | 
						|
																baseOffset = offsetProduced
							 | 
						|
															}
							 | 
						|
														}
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Build correct Produce v2+ response for this partition
							 | 
						|
											// Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5]
							 | 
						|
								
							 | 
						|
											// partition_id (4 bytes)
							 | 
						|
											partitionIDBytes := make([]byte, 4)
							 | 
						|
											binary.BigEndian.PutUint32(partitionIDBytes, partitionID)
							 | 
						|
											response = append(response, partitionIDBytes...)
							 | 
						|
								
							 | 
						|
											// error_code (2 bytes)
							 | 
						|
											response = append(response, byte(errorCode>>8), byte(errorCode))
							 | 
						|
								
							 | 
						|
											// base_offset (8 bytes) - offset of first message
							 | 
						|
											baseOffsetBytes := make([]byte, 8)
							 | 
						|
											binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
							 | 
						|
											response = append(response, baseOffsetBytes...)
							 | 
						|
								
							 | 
						|
											// log_append_time (8 bytes) - v2+ field (actual timestamp, not -1)
							 | 
						|
											if apiVersion >= 2 {
							 | 
						|
												logAppendTimeBytes := make([]byte, 8)
							 | 
						|
												binary.BigEndian.PutUint64(logAppendTimeBytes, uint64(currentTime))
							 | 
						|
												response = append(response, logAppendTimeBytes...)
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// log_start_offset (8 bytes) - v5+ field
							 | 
						|
											if apiVersion >= 5 {
							 | 
						|
												logStartOffsetBytes := make([]byte, 8)
							 | 
						|
												binary.BigEndian.PutUint64(logStartOffsetBytes, uint64(baseOffset))
							 | 
						|
												response = append(response, logStartOffsetBytes...)
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// For fire-and-forget mode, return empty response after processing
							 | 
						|
									if isFireAndForget {
							 | 
						|
										return []byte{}, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Append throttle_time_ms at the END for v1+ (as per original Kafka protocol)
							 | 
						|
									if apiVersion >= 1 {
							 | 
						|
										response = append(response, 0, 0, 0, 0) // throttle_time_ms = 0
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if len(response) < 20 {
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return response, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// performSchemaValidation performs comprehensive schema validation for a topic
							 | 
						|
								func (h *Handler) performSchemaValidation(topicName string, schemaID uint32, messageFormat schema.Format, messageBytes []byte) error {
							 | 
						|
									// 1. Check if topic is configured to require schemas
							 | 
						|
									if !h.isSchematizedTopic(topicName) {
							 | 
						|
										// Topic doesn't require schemas, but message is schematized - this is allowed
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// 2. Get expected schema metadata for the topic
							 | 
						|
									expectedMetadata, err := h.getSchemaMetadataForTopic(topicName)
							 | 
						|
									if err != nil {
							 | 
						|
										// No expected schema found - in strict mode this would be an error
							 | 
						|
										// In permissive mode, allow any valid schema
							 | 
						|
										if h.isStrictSchemaValidation() {
							 | 
						|
											// Add delay before returning schema validation error to prevent overloading
							 | 
						|
											time.Sleep(100 * time.Millisecond)
							 | 
						|
											return fmt.Errorf("topic %s requires schema but no expected schema found: %w", topicName, err)
							 | 
						|
										}
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// 3. Validate schema ID matches expected schema
							 | 
						|
									expectedSchemaID, err := h.parseSchemaID(expectedMetadata["schema_id"])
							 | 
						|
									if err != nil {
							 | 
						|
										// Add delay before returning schema validation error to prevent overloading
							 | 
						|
										time.Sleep(100 * time.Millisecond)
							 | 
						|
										return fmt.Errorf("invalid expected schema ID for topic %s: %w", topicName, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// 4. Check schema compatibility
							 | 
						|
									if schemaID != expectedSchemaID {
							 | 
						|
										// Schema ID doesn't match - check if it's a compatible evolution
							 | 
						|
										compatible, err := h.checkSchemaEvolution(topicName, expectedSchemaID, schemaID, messageFormat)
							 | 
						|
										if err != nil {
							 | 
						|
											// Add delay before returning schema validation error to prevent overloading
							 | 
						|
											time.Sleep(100 * time.Millisecond)
							 | 
						|
											return fmt.Errorf("failed to check schema evolution for topic %s: %w", topicName, err)
							 | 
						|
										}
							 | 
						|
										if !compatible {
							 | 
						|
											// Add delay before returning schema validation error to prevent overloading
							 | 
						|
											time.Sleep(100 * time.Millisecond)
							 | 
						|
											return fmt.Errorf("schema ID %d is not compatible with expected schema %d for topic %s",
							 | 
						|
												schemaID, expectedSchemaID, topicName)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// 5. Validate message format matches expected format
							 | 
						|
									expectedFormatStr := expectedMetadata["schema_format"]
							 | 
						|
									var expectedFormat schema.Format
							 | 
						|
									switch expectedFormatStr {
							 | 
						|
									case "AVRO":
							 | 
						|
										expectedFormat = schema.FormatAvro
							 | 
						|
									case "PROTOBUF":
							 | 
						|
										expectedFormat = schema.FormatProtobuf
							 | 
						|
									case "JSON_SCHEMA":
							 | 
						|
										expectedFormat = schema.FormatJSONSchema
							 | 
						|
									default:
							 | 
						|
										expectedFormat = schema.FormatUnknown
							 | 
						|
									}
							 | 
						|
									if messageFormat != expectedFormat {
							 | 
						|
										return fmt.Errorf("message format %s does not match expected format %s for topic %s",
							 | 
						|
											messageFormat, expectedFormat, topicName)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// 6. Perform message-level validation
							 | 
						|
									return h.validateMessageContent(schemaID, messageFormat, messageBytes)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// checkSchemaEvolution checks if a schema evolution is compatible
							 | 
						|
								func (h *Handler) checkSchemaEvolution(topicName string, expectedSchemaID, actualSchemaID uint32, format schema.Format) (bool, error) {
							 | 
						|
									// Get both schemas
							 | 
						|
									expectedSchema, err := h.schemaManager.GetSchemaByID(expectedSchemaID)
							 | 
						|
									if err != nil {
							 | 
						|
										return false, fmt.Errorf("failed to get expected schema %d: %w", expectedSchemaID, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									actualSchema, err := h.schemaManager.GetSchemaByID(actualSchemaID)
							 | 
						|
									if err != nil {
							 | 
						|
										return false, fmt.Errorf("failed to get actual schema %d: %w", actualSchemaID, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Since we're accessing schema from registry for this topic, ensure topic config is updated
							 | 
						|
									h.ensureTopicSchemaFromRegistryCache(topicName, expectedSchema, actualSchema)
							 | 
						|
								
							 | 
						|
									// Check compatibility based on topic's compatibility level
							 | 
						|
									compatibilityLevel := h.getTopicCompatibilityLevel(topicName)
							 | 
						|
								
							 | 
						|
									result, err := h.schemaManager.CheckSchemaCompatibility(
							 | 
						|
										expectedSchema.Schema,
							 | 
						|
										actualSchema.Schema,
							 | 
						|
										format,
							 | 
						|
										compatibilityLevel,
							 | 
						|
									)
							 | 
						|
									if err != nil {
							 | 
						|
										return false, fmt.Errorf("failed to check schema compatibility: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return result.Compatible, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// validateMessageContent validates the message content against its schema
							 | 
						|
								func (h *Handler) validateMessageContent(schemaID uint32, format schema.Format, messageBytes []byte) error {
							 | 
						|
									// Decode the message to validate it can be parsed correctly
							 | 
						|
									_, err := h.schemaManager.DecodeMessage(messageBytes)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("message validation failed for schema %d: %w", schemaID, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Additional format-specific validation could be added here
							 | 
						|
									switch format {
							 | 
						|
									case schema.FormatAvro:
							 | 
						|
										return h.validateAvroMessage(schemaID, messageBytes)
							 | 
						|
									case schema.FormatProtobuf:
							 | 
						|
										return h.validateProtobufMessage(schemaID, messageBytes)
							 | 
						|
									case schema.FormatJSONSchema:
							 | 
						|
										return h.validateJSONSchemaMessage(schemaID, messageBytes)
							 | 
						|
									default:
							 | 
						|
										return fmt.Errorf("unsupported schema format for validation: %s", format)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// validateAvroMessage performs Avro-specific validation
							 | 
						|
								func (h *Handler) validateAvroMessage(schemaID uint32, messageBytes []byte) error {
							 | 
						|
									// Basic validation is already done in DecodeMessage
							 | 
						|
									// Additional Avro-specific validation could be added here
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// validateProtobufMessage performs Protobuf-specific validation
							 | 
						|
								func (h *Handler) validateProtobufMessage(schemaID uint32, messageBytes []byte) error {
							 | 
						|
									// Get the schema for additional validation
							 | 
						|
									cachedSchema, err := h.schemaManager.GetSchemaByID(schemaID)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to get Protobuf schema %d: %w", schemaID, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse the schema to get the descriptor
							 | 
						|
									parser := schema.NewProtobufDescriptorParser()
							 | 
						|
									protobufSchema, err := parser.ParseBinaryDescriptor([]byte(cachedSchema.Schema), "")
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to parse Protobuf schema: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Validate message against schema
							 | 
						|
									envelope, ok := schema.ParseConfluentEnvelope(messageBytes)
							 | 
						|
									if !ok {
							 | 
						|
										return fmt.Errorf("invalid Confluent envelope")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return protobufSchema.ValidateMessage(envelope.Payload)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// validateJSONSchemaMessage performs JSON Schema-specific validation
							 | 
						|
								func (h *Handler) validateJSONSchemaMessage(schemaID uint32, messageBytes []byte) error {
							 | 
						|
									// Get the schema for validation
							 | 
						|
									cachedSchema, err := h.schemaManager.GetSchemaByID(schemaID)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to get JSON schema %d: %w", schemaID, err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create JSON Schema decoder for validation
							 | 
						|
									decoder, err := schema.NewJSONSchemaDecoder(cachedSchema.Schema)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create JSON Schema decoder: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Parse envelope and validate payload
							 | 
						|
									envelope, ok := schema.ParseConfluentEnvelope(messageBytes)
							 | 
						|
									if !ok {
							 | 
						|
										return fmt.Errorf("invalid Confluent envelope")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Validate JSON payload against schema
							 | 
						|
									_, err = decoder.Decode(envelope.Payload)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("JSON Schema validation failed: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Helper methods for configuration
							 | 
						|
								
							 | 
						|
								// isSchemaValidationError checks if an error is related to schema validation
							 | 
						|
								func (h *Handler) isSchemaValidationError(err error) bool {
							 | 
						|
									if err == nil {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
									errStr := strings.ToLower(err.Error())
							 | 
						|
									return strings.Contains(errStr, "schema") ||
							 | 
						|
										strings.Contains(errStr, "decode") ||
							 | 
						|
										strings.Contains(errStr, "validation") ||
							 | 
						|
										strings.Contains(errStr, "registry") ||
							 | 
						|
										strings.Contains(errStr, "avro") ||
							 | 
						|
										strings.Contains(errStr, "protobuf") ||
							 | 
						|
										strings.Contains(errStr, "json schema")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isStrictSchemaValidation returns whether strict schema validation is enabled
							 | 
						|
								func (h *Handler) isStrictSchemaValidation() bool {
							 | 
						|
									// This could be configurable per topic or globally
							 | 
						|
									// For now, default to permissive mode
							 | 
						|
									return false
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// getTopicCompatibilityLevel returns the compatibility level for a topic
							 | 
						|
								func (h *Handler) getTopicCompatibilityLevel(topicName string) schema.CompatibilityLevel {
							 | 
						|
									// This could be configurable per topic
							 | 
						|
									// For now, default to backward compatibility
							 | 
						|
									return schema.CompatibilityBackward
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// parseSchemaID parses a schema ID from string
							 | 
						|
								func (h *Handler) parseSchemaID(schemaIDStr string) (uint32, error) {
							 | 
						|
									if schemaIDStr == "" {
							 | 
						|
										return 0, fmt.Errorf("empty schema ID")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var schemaID uint64
							 | 
						|
									if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil {
							 | 
						|
										return 0, fmt.Errorf("invalid schema ID format: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if schemaID > 0xFFFFFFFF {
							 | 
						|
										return 0, fmt.Errorf("schema ID too large: %d", schemaID)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return uint32(schemaID), nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// isSystemTopic checks if a topic should bypass schema processing
							 | 
						|
								func (h *Handler) isSystemTopic(topicName string) bool {
							 | 
						|
									// System topics that should be stored as-is without schema processing
							 | 
						|
									systemTopics := []string{
							 | 
						|
										"_schemas",            // Schema Registry topic
							 | 
						|
										"__consumer_offsets",  // Kafka consumer offsets topic
							 | 
						|
										"__transaction_state", // Kafka transaction state topic
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									for _, systemTopic := range systemTopics {
							 | 
						|
										if topicName == systemTopic {
							 | 
						|
											return true
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Also check for topics with system prefixes
							 | 
						|
									return strings.HasPrefix(topicName, "_") || strings.HasPrefix(topicName, "__")
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
							 | 
						|
								// ctx controls the publish timeout - if client cancels, produce operation is cancelled
							 | 
						|
								func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
							 | 
						|
								
							 | 
						|
									// System topics should always bypass schema processing and be stored as-is
							 | 
						|
									if h.isSystemTopic(topic) {
							 | 
						|
										offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
							 | 
						|
										return offset, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// If schema management is not enabled, fall back to raw message handling
							 | 
						|
									isEnabled := h.IsSchemaEnabled()
							 | 
						|
									if !isEnabled {
							 | 
						|
										return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									var keyDecodedMsg *schema.DecodedMessage
							 | 
						|
									var valueDecodedMsg *schema.DecodedMessage
							 | 
						|
								
							 | 
						|
									// Check and decode key if schematized
							 | 
						|
									if key != nil {
							 | 
						|
										isSchematized := h.schemaManager.IsSchematized(key)
							 | 
						|
										if isSchematized {
							 | 
						|
											var err error
							 | 
						|
											keyDecodedMsg, err = h.schemaManager.DecodeMessage(key)
							 | 
						|
											if err != nil {
							 | 
						|
												// Add delay before returning schema decoding error to prevent overloading
							 | 
						|
												time.Sleep(100 * time.Millisecond)
							 | 
						|
												return 0, fmt.Errorf("failed to decode schematized key: %w", err)
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check and decode value if schematized
							 | 
						|
									if value != nil && len(value) > 0 {
							 | 
						|
										isSchematized := h.schemaManager.IsSchematized(value)
							 | 
						|
										if isSchematized {
							 | 
						|
											var err error
							 | 
						|
											valueDecodedMsg, err = h.schemaManager.DecodeMessage(value)
							 | 
						|
											if err != nil {
							 | 
						|
												// If message has schema ID (magic byte 0x00), decoding MUST succeed
							 | 
						|
												// Do not fall back to raw storage - this would corrupt the data model
							 | 
						|
												time.Sleep(100 * time.Millisecond)
							 | 
						|
												return 0, fmt.Errorf("message has schema ID but decoding failed (schema registry may be unavailable): %w", err)
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// If neither key nor value is schematized, fall back to raw message handling
							 | 
						|
									// This is OK for non-schematized messages (no magic byte 0x00)
							 | 
						|
									if keyDecodedMsg == nil && valueDecodedMsg == nil {
							 | 
						|
										return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Process key schema if present
							 | 
						|
									if keyDecodedMsg != nil {
							 | 
						|
										// Store key schema information in memory cache for fetch path performance
							 | 
						|
										if !h.hasTopicKeySchemaConfig(topic, keyDecodedMsg.SchemaID, keyDecodedMsg.SchemaFormat) {
							 | 
						|
											err := h.storeTopicKeySchemaConfig(topic, keyDecodedMsg.SchemaID, keyDecodedMsg.SchemaFormat)
							 | 
						|
											if err != nil {
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Schedule key schema registration in background (leader-only, non-blocking)
							 | 
						|
											h.scheduleKeySchemaRegistration(topic, keyDecodedMsg.RecordType)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Process value schema if present and create combined RecordValue with key fields
							 | 
						|
									var recordValueBytes []byte
							 | 
						|
									if valueDecodedMsg != nil {
							 | 
						|
										// Create combined RecordValue that includes both key and value fields
							 | 
						|
										combinedRecordValue := h.createCombinedRecordValue(keyDecodedMsg, valueDecodedMsg)
							 | 
						|
								
							 | 
						|
										// Store the combined RecordValue - schema info is stored in topic configuration
							 | 
						|
										var err error
							 | 
						|
										recordValueBytes, err = proto.Marshal(combinedRecordValue)
							 | 
						|
										if err != nil {
							 | 
						|
											return 0, fmt.Errorf("failed to marshal combined RecordValue: %w", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Store value schema information in memory cache for fetch path performance
							 | 
						|
										// Only store if not already cached to avoid mutex contention on hot path
							 | 
						|
										hasConfig := h.hasTopicSchemaConfig(topic, valueDecodedMsg.SchemaID, valueDecodedMsg.SchemaFormat)
							 | 
						|
										if !hasConfig {
							 | 
						|
											err = h.storeTopicSchemaConfig(topic, valueDecodedMsg.SchemaID, valueDecodedMsg.SchemaFormat)
							 | 
						|
											if err != nil {
							 | 
						|
												// Log error but don't fail the produce
							 | 
						|
											}
							 | 
						|
								
							 | 
						|
											// Schedule value schema registration in background (leader-only, non-blocking)
							 | 
						|
											h.scheduleSchemaRegistration(topic, valueDecodedMsg.RecordType)
							 | 
						|
										}
							 | 
						|
									} else if keyDecodedMsg != nil {
							 | 
						|
										// If only key is schematized, create RecordValue with just key fields
							 | 
						|
										combinedRecordValue := h.createCombinedRecordValue(keyDecodedMsg, nil)
							 | 
						|
								
							 | 
						|
										var err error
							 | 
						|
										recordValueBytes, err = proto.Marshal(combinedRecordValue)
							 | 
						|
										if err != nil {
							 | 
						|
											return 0, fmt.Errorf("failed to marshal key-only RecordValue: %w", err)
							 | 
						|
										}
							 | 
						|
									} else {
							 | 
						|
										// If value is not schematized, use raw value
							 | 
						|
										recordValueBytes = value
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Prepare final key for storage
							 | 
						|
									finalKey := key
							 | 
						|
									if keyDecodedMsg != nil {
							 | 
						|
										// If key was schematized, convert back to raw bytes for storage
							 | 
						|
										keyBytes, err := proto.Marshal(keyDecodedMsg.RecordValue)
							 | 
						|
										if err != nil {
							 | 
						|
											return 0, fmt.Errorf("failed to marshal key RecordValue: %w", err)
							 | 
						|
										}
							 | 
						|
										finalKey = keyBytes
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Send to SeaweedMQ
							 | 
						|
									if valueDecodedMsg != nil || keyDecodedMsg != nil {
							 | 
						|
										// Store the DECODED RecordValue (not the original Confluent Wire Format)
							 | 
						|
										// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
							 | 
						|
										// which can be re-encoded to Confluent Wire Format during fetch if needed
							 | 
						|
										return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)
							 | 
						|
									} else {
							 | 
						|
										// Send with raw format for non-schematized data
							 | 
						|
										return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// hasTopicSchemaConfig checks if schema config already exists (read-only, fast path)
							 | 
						|
								func (h *Handler) hasTopicSchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) bool {
							 | 
						|
									h.topicSchemaConfigMu.RLock()
							 | 
						|
									defer h.topicSchemaConfigMu.RUnlock()
							 | 
						|
								
							 | 
						|
									if h.topicSchemaConfigs == nil {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									config, exists := h.topicSchemaConfigs[topic]
							 | 
						|
									if !exists {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check if the schema matches (avoid re-registration of same schema)
							 | 
						|
									return config.ValueSchemaID == schemaID && config.ValueSchemaFormat == schemaFormat
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// storeTopicSchemaConfig stores original Kafka schema metadata (ID + format) for fetch path
							 | 
						|
								// This is kept in memory for performance when reconstructing Confluent messages during fetch.
							 | 
						|
								// The translated RecordType is persisted via background schema registration.
							 | 
						|
								func (h *Handler) storeTopicSchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) error {
							 | 
						|
									// Store in memory cache for quick access during fetch operations
							 | 
						|
									h.topicSchemaConfigMu.Lock()
							 | 
						|
									defer h.topicSchemaConfigMu.Unlock()
							 | 
						|
								
							 | 
						|
									if h.topicSchemaConfigs == nil {
							 | 
						|
										h.topicSchemaConfigs = make(map[string]*TopicSchemaConfig)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									config, exists := h.topicSchemaConfigs[topic]
							 | 
						|
									if !exists {
							 | 
						|
										config = &TopicSchemaConfig{}
							 | 
						|
										h.topicSchemaConfigs[topic] = config
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									config.ValueSchemaID = schemaID
							 | 
						|
									config.ValueSchemaFormat = schemaFormat
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// storeTopicKeySchemaConfig stores key schema configuration
							 | 
						|
								func (h *Handler) storeTopicKeySchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) error {
							 | 
						|
									h.topicSchemaConfigMu.Lock()
							 | 
						|
									defer h.topicSchemaConfigMu.Unlock()
							 | 
						|
								
							 | 
						|
									if h.topicSchemaConfigs == nil {
							 | 
						|
										h.topicSchemaConfigs = make(map[string]*TopicSchemaConfig)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									config, exists := h.topicSchemaConfigs[topic]
							 | 
						|
									if !exists {
							 | 
						|
										config = &TopicSchemaConfig{}
							 | 
						|
										h.topicSchemaConfigs[topic] = config
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									config.KeySchemaID = schemaID
							 | 
						|
									config.KeySchemaFormat = schemaFormat
							 | 
						|
									config.HasKeySchema = true
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// hasTopicKeySchemaConfig checks if key schema config already exists
							 | 
						|
								func (h *Handler) hasTopicKeySchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) bool {
							 | 
						|
									h.topicSchemaConfigMu.RLock()
							 | 
						|
									defer h.topicSchemaConfigMu.RUnlock()
							 | 
						|
								
							 | 
						|
									config, exists := h.topicSchemaConfigs[topic]
							 | 
						|
									if !exists {
							 | 
						|
										return false
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Check if the key schema matches
							 | 
						|
									return config.HasKeySchema && config.KeySchemaID == schemaID && config.KeySchemaFormat == schemaFormat
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// scheduleSchemaRegistration registers value schema once per topic-schema combination
							 | 
						|
								func (h *Handler) scheduleSchemaRegistration(topicName string, recordType *schema_pb.RecordType) {
							 | 
						|
									if recordType == nil {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create a unique key for this value schema registration
							 | 
						|
									schemaKey := fmt.Sprintf("%s:value:%d", topicName, h.getRecordTypeHash(recordType))
							 | 
						|
								
							 | 
						|
									// Check if already registered
							 | 
						|
									h.registeredSchemasMu.RLock()
							 | 
						|
									if h.registeredSchemas[schemaKey] {
							 | 
						|
										h.registeredSchemasMu.RUnlock()
							 | 
						|
										return // Already registered
							 | 
						|
									}
							 | 
						|
									h.registeredSchemasMu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Double-check with write lock to prevent race condition
							 | 
						|
									h.registeredSchemasMu.Lock()
							 | 
						|
									defer h.registeredSchemasMu.Unlock()
							 | 
						|
								
							 | 
						|
									if h.registeredSchemas[schemaKey] {
							 | 
						|
										return // Already registered by another goroutine
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Mark as registered before attempting registration
							 | 
						|
									h.registeredSchemas[schemaKey] = true
							 | 
						|
								
							 | 
						|
									// Perform synchronous registration
							 | 
						|
									if err := h.registerSchemasViaBrokerAPI(topicName, recordType, nil); err != nil {
							 | 
						|
										// Remove from registered map on failure so it can be retried
							 | 
						|
										delete(h.registeredSchemas, schemaKey)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// scheduleKeySchemaRegistration registers key schema once per topic-schema combination
							 | 
						|
								func (h *Handler) scheduleKeySchemaRegistration(topicName string, recordType *schema_pb.RecordType) {
							 | 
						|
									if recordType == nil {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create a unique key for this key schema registration
							 | 
						|
									schemaKey := fmt.Sprintf("%s:key:%d", topicName, h.getRecordTypeHash(recordType))
							 | 
						|
								
							 | 
						|
									// Check if already registered
							 | 
						|
									h.registeredSchemasMu.RLock()
							 | 
						|
									if h.registeredSchemas[schemaKey] {
							 | 
						|
										h.registeredSchemasMu.RUnlock()
							 | 
						|
										return // Already registered
							 | 
						|
									}
							 | 
						|
									h.registeredSchemasMu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Double-check with write lock to prevent race condition
							 | 
						|
									h.registeredSchemasMu.Lock()
							 | 
						|
									defer h.registeredSchemasMu.Unlock()
							 | 
						|
								
							 | 
						|
									if h.registeredSchemas[schemaKey] {
							 | 
						|
										return // Already registered by another goroutine
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Mark as registered before attempting registration
							 | 
						|
									h.registeredSchemas[schemaKey] = true
							 | 
						|
								
							 | 
						|
									// Register key schema to the same topic (not a phantom "-key" topic)
							 | 
						|
									// This uses the extended ConfigureTopicRequest with separate key/value RecordTypes
							 | 
						|
									if err := h.registerSchemasViaBrokerAPI(topicName, nil, recordType); err != nil {
							 | 
						|
										// Remove from registered map on failure so it can be retried
							 | 
						|
										delete(h.registeredSchemas, schemaKey)
							 | 
						|
									} else {
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ensureTopicSchemaFromRegistryCache ensures topic configuration is updated when schemas are retrieved from registry
							 | 
						|
								func (h *Handler) ensureTopicSchemaFromRegistryCache(topicName string, schemas ...*schema.CachedSchema) {
							 | 
						|
									if len(schemas) == 0 {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Use the latest/most relevant schema (last one in the list)
							 | 
						|
									latestSchema := schemas[len(schemas)-1]
							 | 
						|
									if latestSchema == nil {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Try to infer RecordType from the cached schema
							 | 
						|
									recordType, err := h.inferRecordTypeFromCachedSchema(latestSchema)
							 | 
						|
									if err != nil {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Schedule schema registration to update topic.conf
							 | 
						|
									if recordType != nil {
							 | 
						|
										h.scheduleSchemaRegistration(topicName, recordType)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ensureTopicKeySchemaFromRegistryCache ensures topic configuration is updated when key schemas are retrieved from registry
							 | 
						|
								func (h *Handler) ensureTopicKeySchemaFromRegistryCache(topicName string, schemas ...*schema.CachedSchema) {
							 | 
						|
									if len(schemas) == 0 {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Use the latest/most relevant schema (last one in the list)
							 | 
						|
									latestSchema := schemas[len(schemas)-1]
							 | 
						|
									if latestSchema == nil {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Try to infer RecordType from the cached schema
							 | 
						|
									recordType, err := h.inferRecordTypeFromCachedSchema(latestSchema)
							 | 
						|
									if err != nil {
							 | 
						|
										return
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Schedule key schema registration to update topic.conf
							 | 
						|
									if recordType != nil {
							 | 
						|
										h.scheduleKeySchemaRegistration(topicName, recordType)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// getRecordTypeHash generates a simple hash for RecordType to use as a key
							 | 
						|
								func (h *Handler) getRecordTypeHash(recordType *schema_pb.RecordType) uint32 {
							 | 
						|
									if recordType == nil {
							 | 
						|
										return 0
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Simple hash based on field count and first field name
							 | 
						|
									hash := uint32(len(recordType.Fields))
							 | 
						|
									if len(recordType.Fields) > 0 {
							 | 
						|
										// Use first field name for additional uniqueness
							 | 
						|
										firstFieldName := recordType.Fields[0].Name
							 | 
						|
										for _, char := range firstFieldName {
							 | 
						|
											hash = hash*31 + uint32(char)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return hash
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createCombinedRecordValue creates a RecordValue that combines fields from both key and value decoded messages
							 | 
						|
								// Key fields are prefixed with "key_" to distinguish them from value fields
							 | 
						|
								// The message key bytes are stored in the _key system column (from logEntry.Key)
							 | 
						|
								func (h *Handler) createCombinedRecordValue(keyDecodedMsg *schema.DecodedMessage, valueDecodedMsg *schema.DecodedMessage) *schema_pb.RecordValue {
							 | 
						|
									combinedFields := make(map[string]*schema_pb.Value)
							 | 
						|
								
							 | 
						|
									// Add key fields with "key_" prefix
							 | 
						|
									if keyDecodedMsg != nil && keyDecodedMsg.RecordValue != nil {
							 | 
						|
										for fieldName, fieldValue := range keyDecodedMsg.RecordValue.Fields {
							 | 
						|
											combinedFields["key_"+fieldName] = fieldValue
							 | 
						|
										}
							 | 
						|
										// Note: The message key bytes are stored in the _key system column (from logEntry.Key)
							 | 
						|
										// We don't create a "key" field here to avoid redundancy
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add value fields (no prefix)
							 | 
						|
									if valueDecodedMsg != nil && valueDecodedMsg.RecordValue != nil {
							 | 
						|
										for fieldName, fieldValue := range valueDecodedMsg.RecordValue.Fields {
							 | 
						|
											combinedFields[fieldName] = fieldValue
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return &schema_pb.RecordValue{
							 | 
						|
										Fields: combinedFields,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// inferRecordTypeFromCachedSchema attempts to infer RecordType from a cached schema
							 | 
						|
								func (h *Handler) inferRecordTypeFromCachedSchema(cachedSchema *schema.CachedSchema) (*schema_pb.RecordType, error) {
							 | 
						|
									if cachedSchema == nil {
							 | 
						|
										return nil, fmt.Errorf("cached schema is nil")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									switch cachedSchema.Format {
							 | 
						|
									case schema.FormatAvro:
							 | 
						|
										return h.inferRecordTypeFromAvroSchema(cachedSchema.Schema)
							 | 
						|
									case schema.FormatProtobuf:
							 | 
						|
										return h.inferRecordTypeFromProtobufSchema(cachedSchema.Schema)
							 | 
						|
									case schema.FormatJSONSchema:
							 | 
						|
										return h.inferRecordTypeFromJSONSchema(cachedSchema.Schema)
							 | 
						|
									default:
							 | 
						|
										return nil, fmt.Errorf("unsupported schema format for inference: %v", cachedSchema.Format)
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// inferRecordTypeFromAvroSchema infers RecordType from Avro schema string
							 | 
						|
								// Uses cache to avoid recreating expensive Avro codecs (17% CPU overhead!)
							 | 
						|
								func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.RecordType, error) {
							 | 
						|
									// Check cache first
							 | 
						|
									h.inferredRecordTypesMu.RLock()
							 | 
						|
									if recordType, exists := h.inferredRecordTypes[avroSchema]; exists {
							 | 
						|
										h.inferredRecordTypesMu.RUnlock()
							 | 
						|
										return recordType, nil
							 | 
						|
									}
							 | 
						|
									h.inferredRecordTypesMu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Cache miss - create decoder and infer type
							 | 
						|
									decoder, err := schema.NewAvroDecoder(avroSchema)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to create Avro decoder: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									recordType, err := decoder.InferRecordType()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Cache the result
							 | 
						|
									h.inferredRecordTypesMu.Lock()
							 | 
						|
									h.inferredRecordTypes[avroSchema] = recordType
							 | 
						|
									h.inferredRecordTypesMu.Unlock()
							 | 
						|
								
							 | 
						|
									return recordType, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// inferRecordTypeFromProtobufSchema infers RecordType from Protobuf schema
							 | 
						|
								// Uses cache to avoid recreating expensive decoders
							 | 
						|
								func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*schema_pb.RecordType, error) {
							 | 
						|
									// Check cache first
							 | 
						|
									cacheKey := "protobuf:" + protobufSchema
							 | 
						|
									h.inferredRecordTypesMu.RLock()
							 | 
						|
									if recordType, exists := h.inferredRecordTypes[cacheKey]; exists {
							 | 
						|
										h.inferredRecordTypesMu.RUnlock()
							 | 
						|
										return recordType, nil
							 | 
						|
									}
							 | 
						|
									h.inferredRecordTypesMu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Cache miss - create decoder and infer type
							 | 
						|
									decoder, err := schema.NewProtobufDecoder([]byte(protobufSchema))
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									recordType, err := decoder.InferRecordType()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Cache the result
							 | 
						|
									h.inferredRecordTypesMu.Lock()
							 | 
						|
									h.inferredRecordTypes[cacheKey] = recordType
							 | 
						|
									h.inferredRecordTypesMu.Unlock()
							 | 
						|
								
							 | 
						|
									return recordType, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// inferRecordTypeFromJSONSchema infers RecordType from JSON Schema string
							 | 
						|
								// Uses cache to avoid recreating expensive decoders
							 | 
						|
								func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.RecordType, error) {
							 | 
						|
									// Check cache first
							 | 
						|
									cacheKey := "json:" + jsonSchema
							 | 
						|
									h.inferredRecordTypesMu.RLock()
							 | 
						|
									if recordType, exists := h.inferredRecordTypes[cacheKey]; exists {
							 | 
						|
										h.inferredRecordTypesMu.RUnlock()
							 | 
						|
										return recordType, nil
							 | 
						|
									}
							 | 
						|
									h.inferredRecordTypesMu.RUnlock()
							 | 
						|
								
							 | 
						|
									// Cache miss - create decoder and infer type
							 | 
						|
									decoder, err := schema.NewJSONSchemaDecoder(jsonSchema)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									recordType, err := decoder.InferRecordType()
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Cache the result
							 | 
						|
									h.inferredRecordTypesMu.Lock()
							 | 
						|
									h.inferredRecordTypes[cacheKey] = recordType
							 | 
						|
									h.inferredRecordTypesMu.Unlock()
							 | 
						|
								
							 | 
						|
									return recordType, nil
							 | 
						|
								}
							 |