diff --git a/test/kafka/sarama_simple_test.go b/test/kafka/sarama_simple_test.go index 5ba7c1560..a678c03e7 100644 --- a/test/kafka/sarama_simple_test.go +++ b/test/kafka/sarama_simple_test.go @@ -182,7 +182,7 @@ func TestSaramaProduceConsume(t *testing.T) { // Test messages testMessages := []string{ "Sarama Producer Message 1", - "Sarama Producer Message 2", + "Sarama Producer Message 2", "Sarama Producer Message 3", } diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index b7d0f0fd1..29ad2eb0e 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -3,6 +3,7 @@ package protocol import ( "encoding/binary" "fmt" + "hash/crc32" "time" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" @@ -30,11 +31,8 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling) } - // Fetch v4+ has session_id and error_code - if apiVersion >= 4 { - response = append(response, 0, 0) // error_code (2 bytes, 0 = no error) - response = append(response, 0, 0, 0, 0) // session_id (4 bytes, 0 for now) - } + // Fetch v4+ has different format - no error_code and session_id at top level + // The session_id and error_code are handled differently in v4+ // Topics count topicsCount := len(fetchRequest.Topics) @@ -45,7 +43,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Process each requested topic for _, topic := range fetchRequest.Topics { topicNameBytes := []byte(topic.Name) - + // Topic name length and name response = append(response, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) response = append(response, topicNameBytes...) @@ -69,6 +67,9 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Get ledger for this topic-partition to determine high water mark ledger := h.GetOrCreateLedger(topic.Name, partition.PartitionID) highWaterMark := ledger.GetHighWaterMark() + + fmt.Printf("DEBUG: Fetch - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d\n", + topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark) // High water mark (8 bytes) highWaterMarkBytes := make([]byte, 8) @@ -88,14 +89,27 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo response = append(response, 0, 0, 0, 0) // aborted_transactions count (4 bytes) = 0 } - // Records - construct record batch with actual messages - recordBatch := h.constructRecordBatchFromLedger(ledger, partition.FetchOffset, highWaterMark) - + // Records - get actual stored record batches + var recordBatch []byte + if highWaterMark > partition.FetchOffset { + // Try to get the actual stored record batch first + if storedBatch, exists := h.GetRecordBatch(topic.Name, partition.PartitionID, partition.FetchOffset); exists { + recordBatch = storedBatch + fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch)) + } else { + // Fallback to synthetic batch if no stored batch found + recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark) + fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch)) + } + } else { + recordBatch = []byte{} // No messages available + } + // Records size (4 bytes) recordsSizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(recordsSizeBytes, uint32(len(recordBatch))) response = append(response, recordsSizeBytes...) - + // Records data response = append(response, recordBatch...) @@ -123,10 +137,10 @@ type FetchTopic struct { } type FetchPartition struct { - PartitionID int32 - FetchOffset int64 + PartitionID int32 + FetchOffset int64 LogStartOffset int64 - MaxBytes int32 + MaxBytes int32 } // parseFetchRequest parses a Kafka Fetch request @@ -144,7 +158,7 @@ func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*Fet } clientIDLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 - + if clientIDLength >= 0 { if offset+clientIDLength > len(requestBody) { return nil, fmt.Errorf("insufficient data for client_id") @@ -282,7 +296,7 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset offsetLedger, ok := ledger.(interface { GetMessages(startOffset, endOffset int64) []interface{} }) - + if !ok { // If ledger doesn't support GetMessages, return empty batch return []byte{} @@ -362,7 +376,7 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset for i, msg := range messages { // Try to extract key and value from the message var key, value []byte - + // Handle different message types if msgMap, ok := msg.(map[string]interface{}); ok { if keyVal, exists := msgMap["key"]; exists { @@ -428,6 +442,124 @@ func (h *Handler) constructRecordBatchFromLedger(ledger interface{}, fetchOffset return batch } +// constructSimpleRecordBatch creates a simple record batch for testing +func (h *Handler) constructSimpleRecordBatch(fetchOffset, highWaterMark int64) []byte { + recordsToFetch := highWaterMark - fetchOffset + if recordsToFetch <= 0 { + return []byte{} // no records to fetch + } + + // Limit the number of records for testing + if recordsToFetch > 10 { + recordsToFetch = 10 + } + + // Create a simple record batch + batch := make([]byte, 0, 512) + + // Record batch header + baseOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(baseOffsetBytes, uint64(fetchOffset)) + batch = append(batch, baseOffsetBytes...) // base offset (8 bytes) + + // Calculate batch length (will be filled after we know the size) + batchLengthPos := len(batch) + batch = append(batch, 0, 0, 0, 0) // batch length placeholder (4 bytes) + + batch = append(batch, 0, 0, 0, 0) // partition leader epoch (4 bytes) + batch = append(batch, 2) // magic byte (version 2) (1 byte) + + // CRC placeholder (4 bytes) - will be calculated at the end + crcPos := len(batch) + batch = append(batch, 0, 0, 0, 0) // CRC32 placeholder + + // Batch attributes (2 bytes) - no compression, no transactional + batch = append(batch, 0, 0) // attributes + + // Last offset delta (4 bytes) + lastOffsetDelta := uint32(recordsToFetch - 1) + lastOffsetDeltaBytes := make([]byte, 4) + binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) + batch = append(batch, lastOffsetDeltaBytes...) + + // First timestamp (8 bytes) + firstTimestamp := time.Now().UnixMilli() + firstTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(firstTimestampBytes, uint64(firstTimestamp)) + batch = append(batch, firstTimestampBytes...) + + // Max timestamp (8 bytes) + maxTimestamp := firstTimestamp + recordsToFetch - 1 + maxTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp)) + batch = append(batch, maxTimestampBytes...) + + // Producer ID (8 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) + + // Producer Epoch (2 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF) + + // Base Sequence (4 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) + + // Record count (4 bytes) + recordCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(recordCountBytes, uint32(recordsToFetch)) + batch = append(batch, recordCountBytes...) + + // Add individual records + for i := int64(0); i < recordsToFetch; i++ { + // Create test message + key := []byte(fmt.Sprintf("key-%d", fetchOffset+i)) + value := []byte(fmt.Sprintf("Test message %d", fetchOffset+i)) + + // Build individual record + record := make([]byte, 0, 128) + + // Record attributes (1 byte) + record = append(record, 0) + + // Timestamp delta (varint) + timestampDelta := i + record = append(record, encodeVarint(timestampDelta)...) + + // Offset delta (varint) + offsetDelta := i + record = append(record, encodeVarint(offsetDelta)...) + + // Key length and key (varint + data) + record = append(record, encodeVarint(int64(len(key)))...) + record = append(record, key...) + + // Value length and value (varint + data) + record = append(record, encodeVarint(int64(len(value)))...) + record = append(record, value...) + + // Headers count (varint) - 0 headers + record = append(record, encodeVarint(0)...) + + // Prepend record length (varint) + recordLength := int64(len(record)) + batch = append(batch, encodeVarint(recordLength)...) + batch = append(batch, record...) + } + + // Fill in the batch length + batchLength := uint32(len(batch) - batchLengthPos - 4) + binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) + + // Calculate CRC32 for the batch + // CRC is calculated over: attributes + last_offset_delta + first_timestamp + max_timestamp + producer_id + producer_epoch + base_sequence + records_count + records + // This starts after the CRC field (which comes after magic byte) + crcStartPos := crcPos + 4 // start after the CRC field + crcData := batch[crcStartPos:] + crc := crc32.ChecksumIEEE(crcData) + binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc) + + return batch +} + // constructRecordBatch creates a realistic Kafka record batch that matches produced messages // This creates record batches that mirror what was actually stored during Produce operations func (h *Handler) constructRecordBatch(ledger interface{}, fetchOffset, highWaterMark int64) []byte { diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index df46f7013..1cce73e95 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -38,6 +38,10 @@ type Handler struct { ledgersMu sync.RWMutex ledgers map[TopicPartitionKey]*offset.Ledger // topic-partition -> offset ledger + // Record batch storage for in-memory mode (for testing) + recordBatchMu sync.RWMutex + recordBatches map[string][]byte // "topic:partition:offset" -> record batch data + // SeaweedMQ integration (optional, for production use) seaweedMQHandler *integration.SeaweedMQHandler useSeaweedMQ bool @@ -60,6 +64,7 @@ func NewHandler() *Handler { return &Handler{ topics: make(map[string]*TopicInfo), ledgers: make(map[TopicPartitionKey]*offset.Ledger), + recordBatches: make(map[string][]byte), useSeaweedMQ: false, groupCoordinator: consumer.NewGroupCoordinator(), brokerHost: "localhost", // default fallback @@ -142,6 +147,23 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { return h.ledgers[key] } +// StoreRecordBatch stores a record batch for later retrieval during Fetch operations +func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { + key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset) + h.recordBatchMu.Lock() + defer h.recordBatchMu.Unlock() + h.recordBatches[key] = recordBatch +} + +// GetRecordBatch retrieves a stored record batch +func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { + key := fmt.Sprintf("%s:%d:%d", topicName, partition, offset) + h.recordBatchMu.RLock() + defer h.recordBatchMu.RUnlock() + batch, exists := h.recordBatches[key] + return batch, exists +} + // SetBrokerAddress updates the broker address used in Metadata responses func (h *Handler) SetBrokerAddress(host string, port int) { h.brokerHost = host @@ -234,6 +256,11 @@ func (h *Handler) HandleConn(conn net.Conn) error { case 1: // Fetch fmt.Printf("DEBUG: *** FETCH HANDLER CALLED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleFetch(correlationID, apiVersion, messageBuf[8:]) // skip header + if err != nil { + fmt.Printf("DEBUG: Fetch error: %v\n", err) + } else { + fmt.Printf("DEBUG: Fetch response hex dump (%d bytes): %x\n", len(response), response) + } case 11: // JoinGroup fmt.Printf("DEBUG: *** JOINGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) response, err = h.handleJoinGroup(correlationID, apiVersion, messageBuf[8:]) // skip header diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 019a088ad..fe9bba68b 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -81,6 +81,7 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req response = append(response, topicsCountBytes...) // Process each topic + fmt.Printf("DEBUG: Produce v%d - topics count: %d\n", apiVersion, topicsCount) for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { break @@ -165,6 +166,7 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req currentTime := time.Now().UnixNano() fmt.Printf("DEBUG: Processing partition %d for topic '%s' (topicExists=%v)\n", partitionID, topicName, topicExists) + fmt.Printf("DEBUG: Record set size: %d bytes\n", recordSetSize) if !topicExists { fmt.Printf("DEBUG: ERROR - Topic '%s' not found, returning UNKNOWN_TOPIC_OR_PARTITION\n", topicName) @@ -173,6 +175,7 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req fmt.Printf("DEBUG: SUCCESS - Topic '%s' found, processing record set (size=%d)\n", topicName, recordSetSize) // Process the record set recordCount, totalSize, parseErr := h.parseRecordSet(recordSetData) + fmt.Printf("DEBUG: parseRecordSet result - recordCount: %d, totalSize: %d, parseErr: %v\n", recordCount, totalSize, parseErr) if parseErr != nil { errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { @@ -187,13 +190,19 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req } else { // Use legacy in-memory mode for tests ledger := h.GetOrCreateLedger(topicName, int32(partitionID)) + fmt.Printf("DEBUG: Before AssignOffsets - HWM: %d, recordCount: %d\n", ledger.GetHighWaterMark(), recordCount) baseOffset = ledger.AssignOffsets(int64(recordCount)) + fmt.Printf("DEBUG: After AssignOffsets - HWM: %d, baseOffset: %d\n", ledger.GetHighWaterMark(), baseOffset) // Append each record to the ledger avgSize := totalSize / recordCount for k := int64(0); k < int64(recordCount); k++ { - ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize) + err := ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize) + if err != nil { + fmt.Printf("DEBUG: AppendRecord error: %v\n", err) + } } + fmt.Printf("DEBUG: After AppendRecord - HWM: %d, entries: %d\n", ledger.GetHighWaterMark(), len(ledger.GetEntries())) } } } @@ -423,14 +432,76 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r recordSetSize := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // Skip the record set data for now (we'll implement proper parsing later) + // Extract record set data for processing if len(requestBody) < offset+int(recordSetSize) { break } + recordSetData := requestBody[offset : offset+int(recordSetSize)] offset += int(recordSetSize) fmt.Printf("DEBUG: Produce v%d - partition: %d, record_set_size: %d\n", apiVersion, partitionID, recordSetSize) + // Process the record set and store in ledger + var errorCode uint16 = 0 + var baseOffset int64 = 0 + currentTime := time.Now().UnixNano() + + // Check if topic exists, auto-create if it doesn't + h.topicsMu.Lock() + _, topicExists := h.topics[topicName] + if !topicExists { + fmt.Printf("DEBUG: Auto-creating topic during Produce v%d: %s\n", apiVersion, topicName) + h.topics[topicName] = &TopicInfo{ + Name: topicName, + Partitions: 1, // Default to 1 partition + CreatedAt: time.Now().UnixNano(), + } + // Initialize ledger for partition 0 + h.GetOrCreateLedger(topicName, 0) + topicExists = true + } + h.topicsMu.Unlock() + + if !topicExists { + errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION + } else { + // Process the record set + recordCount, totalSize, parseErr := h.parseRecordSet(recordSetData) + fmt.Printf("DEBUG: Produce v%d parseRecordSet result - recordCount: %d, totalSize: %d, parseErr: %v\n", apiVersion, recordCount, totalSize, parseErr) + if parseErr != nil { + errorCode = 42 // INVALID_RECORD + } else if recordCount > 0 { + if h.useSeaweedMQ { + // Use SeaweedMQ integration for production + offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) + if err != nil { + errorCode = 1 // UNKNOWN_SERVER_ERROR + } else { + baseOffset = offset + } + } else { + // Use legacy in-memory mode for tests + ledger := h.GetOrCreateLedger(topicName, int32(partitionID)) + fmt.Printf("DEBUG: Produce v%d Before AssignOffsets - HWM: %d, recordCount: %d\n", apiVersion, ledger.GetHighWaterMark(), recordCount) + baseOffset = ledger.AssignOffsets(int64(recordCount)) + fmt.Printf("DEBUG: Produce v%d After AssignOffsets - HWM: %d, baseOffset: %d\n", apiVersion, ledger.GetHighWaterMark(), baseOffset) + + // Store the actual record batch data for Fetch operations + h.StoreRecordBatch(topicName, int32(partitionID), baseOffset, recordSetData) + + // Append each record to the ledger + avgSize := totalSize / recordCount + for k := int64(0); k < int64(recordCount); k++ { + err := ledger.AppendRecord(baseOffset+k, currentTime+k*1000, avgSize) + if err != nil { + fmt.Printf("DEBUG: Produce v%d AppendRecord error: %v\n", apiVersion, err) + } + } + fmt.Printf("DEBUG: Produce v%d After AppendRecord - HWM: %d, entries: %d\n", apiVersion, ledger.GetHighWaterMark(), len(ledger.GetEntries())) + } + } + } + // Build correct Produce v2+ response for this partition // Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5] @@ -439,12 +510,10 @@ func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, r binary.BigEndian.PutUint32(partitionIDBytes, partitionID) response = append(response, partitionIDBytes...) - // error_code (2 bytes) - 0 = success - response = append(response, 0, 0) + // error_code (2 bytes) + response = append(response, byte(errorCode>>8), byte(errorCode)) - // base_offset (8 bytes) - offset of first message (stubbed) - currentTime := time.Now().UnixNano() - baseOffset := currentTime / 1000000 // Use timestamp as offset for now + // base_offset (8 bytes) - offset of first message baseOffsetBytes := make([]byte, 8) binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset)) response = append(response, baseOffsetBytes...)