diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index de81f3568..0fa268c1a 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -80,7 +80,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo response = append(response, highWaterMarkBytes...) // Log start offset (8 bytes) - 0 for simplicity response = append(response, 0, 0, 0, 0, 0, 0, 0, 0) - + // Aborted transactions count (4 bytes) = 0 response = append(response, 0, 0, 0, 0) } @@ -682,7 +682,7 @@ func encodeVarint(value int64) []byte { // getMultipleRecordBatches retrieves and combines multiple record batches starting from the given offset func (h *Handler) getMultipleRecordBatches(topicName string, partitionID int32, startOffset, highWaterMark int64) []byte { var combinedBatch []byte - + // Try to get all available record batches from startOffset to highWaterMark-1 for offset := startOffset; offset < highWaterMark; offset++ { if batch, exists := h.GetRecordBatch(topicName, partitionID, offset); exists { @@ -697,7 +697,7 @@ func (h *Handler) getMultipleRecordBatches(topicName string, partitionID int32, } } } - + return combinedBatch } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 662490c7d..522e6d15a 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -161,42 +161,42 @@ func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { h.recordBatchMu.RLock() defer h.recordBatchMu.RUnlock() - + fmt.Printf("DEBUG: GetRecordBatch - looking for topic=%s, partition=%d, offset=%d\n", topicName, partition, offset) fmt.Printf("DEBUG: Available record batches: %d\n", len(h.recordBatches)) - + // Look for a record batch that contains this offset // Record batches are stored by their base offset, but may contain multiple records topicPartitionPrefix := fmt.Sprintf("%s:%d:", topicName, partition) - + for key, batch := range h.recordBatches { fmt.Printf("DEBUG: Checking key: %s\n", key) if !strings.HasPrefix(key, topicPartitionPrefix) { continue } - + // Extract the base offset from the key parts := strings.Split(key, ":") if len(parts) != 3 { continue } - + baseOffset, err := strconv.ParseInt(parts[2], 10, 64) if err != nil { continue } - + // Check if this batch could contain the requested offset // We need to parse the batch to determine how many records it contains recordCount := h.getRecordCountFromBatch(batch) fmt.Printf("DEBUG: Batch key=%s, baseOffset=%d, recordCount=%d, requested offset=%d\n", key, baseOffset, recordCount, offset) - + if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) return batch, true } } - + fmt.Printf("DEBUG: No matching batch found for offset %d\n", offset) return nil, false } @@ -204,15 +204,15 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64 // getRecordCountFromBatch extracts the record count from a Kafka record batch func (h *Handler) getRecordCountFromBatch(batch []byte) int32 { // Kafka record batch format: - // base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) + + // base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) + // attributes (2) + last_offset_delta (4) + first_timestamp (8) + max_timestamp (8) + // producer_id (8) + producer_epoch (2) + base_sequence (4) + records_count (4) + records... - + // The record count is at offset 57 (8+4+4+1+4+2+4+8+8+8+2+4 = 57) if len(batch) < 61 { // 57 + 4 bytes for record count return 0 } - + recordCount := binary.BigEndian.Uint32(batch[57:61]) return int32(recordCount) } @@ -269,7 +269,7 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - + fmt.Printf("DEBUG: API Request - Key: %d, Version: %d, Correlation: %d\n", apiKey, apiVersion, correlationID) apiName := getAPIName(apiKey) @@ -465,6 +465,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 4) // max version 4 + fmt.Printf("DEBUG: ApiVersions v0 response: %d bytes\n", len(response)) return response, nil } @@ -851,6 +852,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( } response := buf.Bytes() + fmt.Printf("DEBUG: Metadata v4 response size: %d bytes, hex: %x\n", len(response), response) return response, nil } @@ -1128,15 +1130,23 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - // ListOffsets v2+ has additional fields: replica_id(4) + isolation_level(1) - if apiVersion >= 2 { - if len(requestBody) < offset+5 { - return nil, fmt.Errorf("ListOffsets v%d request missing replica_id/isolation_level", apiVersion) + // ListOffsets v1+ has replica_id(4), v2+ adds isolation_level(1) + if apiVersion >= 1 { + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("ListOffsets v%d request missing replica_id", apiVersion) } replicaID := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) - isolationLevel := requestBody[offset+4] - offset += 5 - fmt.Printf("DEBUG: ListOffsets v%d - replica_id: %d, isolation_level: %d\n", apiVersion, replicaID, isolationLevel) + offset += 4 + fmt.Printf("DEBUG: ListOffsets v%d - replica_id: %d\n", apiVersion, replicaID) + + if apiVersion >= 2 { + if len(requestBody) < offset+1 { + return nil, fmt.Errorf("ListOffsets v%d request missing isolation_level", apiVersion) + } + isolationLevel := requestBody[offset] + offset += 1 + fmt.Printf("DEBUG: ListOffsets v%d - isolation_level: %d\n", apiVersion, isolationLevel) + } } if len(requestBody) < offset+4 { @@ -1256,7 +1266,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req } } - fmt.Printf("DEBUG: ListOffsets v%d response: %d bytes\n", apiVersion, len(response)) + fmt.Printf("DEBUG: ListOffsets v%d response: %d bytes, hex: %x\n", apiVersion, len(response), response) return response, nil }