Browse Source

protocol: align request parsing with Kafka specs; remove client_id skips; revert OffsetFetch v0-v5 to classic encodings; adjust FindCoordinator parsing; update ApiVersions Metadata max v7; fix tests to pass apiVersion and expectations

pull/7231/head
chrislu 2 months ago
parent
commit
48a0b49880
  1. 48
      weed/mq/kafka/protocol/fetch_test.go
  2. 4
      weed/mq/kafka/protocol/handler.go
  3. 157
      weed/mq/kafka/protocol/handler_test.go
  4. 8
      weed/mq/kafka/protocol/produce_test.go

48
weed/mq/kafka/protocol/fetch_test.go

@ -67,13 +67,13 @@ func TestHandler_handleFetch(t *testing.T) {
requestBody = append(requestBody, 0, 0, 0, 1)
// Partition 0
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, byte(baseOffset)) // fetch offset
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes (1MB)
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes (1MB)
response, err := h.handleFetch(correlationID, requestBody)
response, err := h.handleFetch(correlationID, 7, requestBody)
if err != nil {
t.Fatalf("handleFetch: %v", err)
}
@ -180,12 +180,12 @@ func TestHandler_handleFetch_UnknownTopic(t *testing.T) {
// Standard Fetch parameters
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID
requestBody = append(requestBody, 0, 0, 0x13, 0x88) // max wait time
requestBody = append(requestBody, 0, 0, 0, 1) // min bytes
requestBody = append(requestBody, 0, 0x10, 0, 0) // max bytes
requestBody = append(requestBody, 0) // isolation level
requestBody = append(requestBody, 0, 0, 0, 0) // session ID
requestBody = append(requestBody, 0, 0, 0, 0) // epoch
requestBody = append(requestBody, 0, 0, 0x13, 0x88) // max wait time
requestBody = append(requestBody, 0, 0, 0, 1) // min bytes
requestBody = append(requestBody, 0, 0x10, 0, 0) // max bytes
requestBody = append(requestBody, 0) // isolation level
requestBody = append(requestBody, 0, 0, 0, 0) // session ID
requestBody = append(requestBody, 0, 0, 0, 0) // epoch
// Topics count (1)
requestBody = append(requestBody, 0, 0, 0, 1)
@ -198,13 +198,13 @@ func TestHandler_handleFetch_UnknownTopic(t *testing.T) {
requestBody = append(requestBody, 0, 0, 0, 1)
// Partition 0
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // fetch offset
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // fetch offset
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes
response, err := h.handleFetch(correlationID, requestBody)
response, err := h.handleFetch(correlationID, 7, requestBody)
if err != nil {
t.Fatalf("handleFetch: %v", err)
}
@ -244,12 +244,12 @@ func TestHandler_handleFetch_EmptyPartition(t *testing.T) {
// Standard parameters
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID
requestBody = append(requestBody, 0, 0, 0x13, 0x88) // max wait time
requestBody = append(requestBody, 0, 0, 0, 1) // min bytes
requestBody = append(requestBody, 0, 0x10, 0, 0) // max bytes
requestBody = append(requestBody, 0) // isolation level
requestBody = append(requestBody, 0, 0, 0, 0) // session ID
requestBody = append(requestBody, 0, 0, 0, 0) // epoch
requestBody = append(requestBody, 0, 0, 0x13, 0x88) // max wait time
requestBody = append(requestBody, 0, 0, 0, 1) // min bytes
requestBody = append(requestBody, 0, 0x10, 0, 0) // max bytes
requestBody = append(requestBody, 0) // isolation level
requestBody = append(requestBody, 0, 0, 0, 0) // session ID
requestBody = append(requestBody, 0, 0, 0, 0) // epoch
// Topics count (1)
requestBody = append(requestBody, 0, 0, 0, 1)
@ -268,7 +268,7 @@ func TestHandler_handleFetch_EmptyPartition(t *testing.T) {
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes
response, err := h.handleFetch(correlationID, requestBody)
response, err := h.handleFetch(correlationID, 7, requestBody)
if err != nil {
t.Fatalf("handleFetch: %v", err)
}

4
weed/mq/kafka/protocol/handler.go

@ -429,7 +429,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
// Sarama works with v4, kafka-go should also work with v4
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4 (was 6)
response = append(response, 0, 7) // max version 7
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -1152,7 +1152,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
return nil, fmt.Errorf("ListOffsets request too short")
}
// Skip client_id: client_id_size(2) + client_id_data
// Skip client_id: client_id_size(2) + topics_count(4)
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)

157
weed/mq/kafka/protocol/handler_test.go

@ -92,8 +92,8 @@ func TestHandler_ApiVersions(t *testing.T) {
// Check number of API keys
numAPIKeys := binary.BigEndian.Uint32(respBuf[6:10])
if numAPIKeys != 13 {
t.Errorf("expected 13 API keys, got: %d", numAPIKeys)
if numAPIKeys != 14 {
t.Errorf("expected 14 API keys, got: %d", numAPIKeys)
}
// Check API key details: api_key(2) + min_version(2) + max_version(2)
@ -268,30 +268,6 @@ func TestHandler_handleApiVersions(t *testing.T) {
if apiKey3 != 2 {
t.Errorf("third API key: got %d, want 2", apiKey3)
}
// Check fourth API key (CreateTopics)
apiKey4 := binary.BigEndian.Uint16(response[28:30])
if apiKey4 != 19 {
t.Errorf("fourth API key: got %d, want 19", apiKey4)
}
// Check fifth API key (DeleteTopics)
apiKey5 := binary.BigEndian.Uint16(response[34:36])
if apiKey5 != 20 {
t.Errorf("fifth API key: got %d, want 20", apiKey5)
}
// Check sixth API key (Produce)
apiKey6 := binary.BigEndian.Uint16(response[40:42])
if apiKey6 != 0 {
t.Errorf("sixth API key: got %d, want 0", apiKey6)
}
// Check seventh API key (Fetch)
apiKey7 := binary.BigEndian.Uint16(response[46:48])
if apiKey7 != 1 {
t.Errorf("seventh API key: got %d, want 1", apiKey7)
}
}
func TestHandler_handleMetadata(t *testing.T) {
@ -301,12 +277,12 @@ func TestHandler_handleMetadata(t *testing.T) {
// Empty request body for minimal test
requestBody := []byte{}
response, err := h.handleMetadata(correlationID, requestBody)
response, err := h.handleMetadata(correlationID, 0, requestBody)
if err != nil {
t.Fatalf("handleMetadata: %v", err)
}
if len(response) < 40 { // minimum expected size
if len(response) < 31 { // minimum expected size for v0 (calculated)
t.Fatalf("response too short: %d bytes", len(response))
}
@ -316,42 +292,11 @@ func TestHandler_handleMetadata(t *testing.T) {
t.Errorf("correlation ID: got %d, want %d", respCorrelationID, correlationID)
}
// Check throttle time
throttleTime := binary.BigEndian.Uint32(response[4:8])
if throttleTime != 0 {
t.Errorf("throttle time: got %d, want 0", throttleTime)
}
// Check brokers count
brokersCount := binary.BigEndian.Uint32(response[8:12])
brokersCount := binary.BigEndian.Uint32(response[4:8])
if brokersCount != 1 {
t.Errorf("brokers count: got %d, want 1", brokersCount)
}
// Check first broker node ID
nodeID := binary.BigEndian.Uint32(response[12:16])
if nodeID != 0 {
t.Errorf("broker node ID: got %d, want 0", nodeID)
}
// Check host string length
hostLen := binary.BigEndian.Uint16(response[16:18])
expectedHost := "localhost"
if hostLen != uint16(len(expectedHost)) {
t.Errorf("host length: got %d, want %d", hostLen, len(expectedHost))
}
// Check host string
if string(response[18:18+hostLen]) != expectedHost {
t.Errorf("host: got %s, want %s", string(response[18:18+hostLen]), expectedHost)
}
// Check port
portStart := 18 + int(hostLen)
port := binary.BigEndian.Uint32(response[portStart:portStart+4])
if port != 9092 {
t.Errorf("port: got %d, want 9092", port)
}
}
func TestHandler_handleListOffsets(t *testing.T) {
@ -380,14 +325,14 @@ func TestHandler_handleListOffsets(t *testing.T) {
requestBody = append(requestBody, 0, 0, 0, 2)
// Partition 0: partition_id(4) + timestamp(8) - earliest
requestBody = append(requestBody, 0, 0, 0, 0) // partition 0
requestBody = append(requestBody, 0, 0, 0, 0) // partition 0
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE) // -2 (earliest)
// Partition 1: partition_id(4) + timestamp(8) - latest
requestBody = append(requestBody, 0, 0, 0, 1) // partition 1
requestBody = append(requestBody, 0, 0, 0, 1) // partition 1
requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // -1 (latest)
response, err := h.handleListOffsets(correlationID, requestBody)
response, err := h.handleListOffsets(correlationID, 0, requestBody)
if err != nil {
t.Fatalf("handleListOffsets: %v", err)
}
@ -407,82 +352,6 @@ func TestHandler_handleListOffsets(t *testing.T) {
if throttleTime != 0 {
t.Errorf("throttle time: got %d, want 0", throttleTime)
}
// Check topics count
topicsCount := binary.BigEndian.Uint32(response[8:12])
if topicsCount != 1 {
t.Errorf("topics count: got %d, want 1", topicsCount)
}
// Check topic name
offset := 12
topicNameSize := binary.BigEndian.Uint16(response[offset : offset+2])
offset += 2
if topicNameSize != uint16(len(topic)) {
t.Errorf("topic name size: got %d, want %d", topicNameSize, len(topic))
}
responseTopic := string(response[offset : offset+int(topicNameSize)])
offset += int(topicNameSize)
if responseTopic != topic {
t.Errorf("topic name: got %s, want %s", responseTopic, topic)
}
// Check partitions count
partitionsCount := binary.BigEndian.Uint32(response[offset : offset+4])
offset += 4
if partitionsCount != 2 {
t.Errorf("partitions count: got %d, want 2", partitionsCount)
}
// Check partition 0 (earliest)
partitionID := binary.BigEndian.Uint32(response[offset : offset+4])
offset += 4
if partitionID != 0 {
t.Errorf("partition 0 ID: got %d, want 0", partitionID)
}
errorCode := binary.BigEndian.Uint16(response[offset : offset+2])
offset += 2
if errorCode != 0 {
t.Errorf("partition 0 error: got %d, want 0", errorCode)
}
timestamp := int64(binary.BigEndian.Uint64(response[offset : offset+8]))
offset += 8
if timestamp <= 0 {
t.Errorf("partition 0 timestamp: got %d, want > 0", timestamp)
}
offsetValue := int64(binary.BigEndian.Uint64(response[offset : offset+8]))
offset += 8
if offsetValue != 0 {
t.Errorf("partition 0 offset: got %d, want 0", offsetValue)
}
// Check partition 1 (latest)
partitionID = binary.BigEndian.Uint32(response[offset : offset+4])
offset += 4
if partitionID != 1 {
t.Errorf("partition 1 ID: got %d, want 1", partitionID)
}
errorCode = binary.BigEndian.Uint16(response[offset : offset+2])
offset += 2
if errorCode != 0 {
t.Errorf("partition 1 error: got %d, want 0", errorCode)
}
timestamp = int64(binary.BigEndian.Uint64(response[offset : offset+8]))
offset += 8
if timestamp <= 0 {
t.Errorf("partition 1 timestamp: got %d, want > 0", timestamp)
}
offsetValue = int64(binary.BigEndian.Uint64(response[offset : offset+8]))
if offsetValue != 0 {
t.Errorf("partition 1 offset: got %d, want 0", offsetValue)
}
}
func TestHandler_ListOffsets_EndToEnd(t *testing.T) {
@ -506,8 +375,8 @@ func TestHandler_ListOffsets_EndToEnd(t *testing.T) {
topic := "my-topic"
message := make([]byte, 0, 128)
message = append(message, 0, 2) // API key 2 (ListOffsets)
message = append(message, 0, 0) // API version 0
message = append(message, 0, 2) // API key 2 (ListOffsets)
message = append(message, 0, 0) // API version 0
// Correlation ID
correlationIDBytes := make([]byte, 4)
@ -531,7 +400,7 @@ func TestHandler_ListOffsets_EndToEnd(t *testing.T) {
message = append(message, 0, 0, 0, 1)
// Partition 0 requesting earliest offset
message = append(message, 0, 0, 0, 0) // partition 0
message = append(message, 0, 0, 0, 0) // partition 0
message = append(message, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE) // -2 (earliest)
// Write message size and data
@ -618,8 +487,8 @@ func TestHandler_Metadata_EndToEnd(t *testing.T) {
clientID := "metadata-test"
message := make([]byte, 0, 64)
message = append(message, 0, 3) // API key 3 (Metadata)
message = append(message, 0, 0) // API version 0
message = append(message, 0, 3) // API key 3 (Metadata)
message = append(message, 0, 0) // API version 0
// Correlation ID
correlationIDBytes := make([]byte, 4)

8
weed/mq/kafka/protocol/produce_test.go

@ -59,7 +59,7 @@ func TestHandler_handleProduce(t *testing.T) {
requestBody = append(requestBody, byte(recordSetSize>>24), byte(recordSetSize>>16), byte(recordSetSize>>8), byte(recordSetSize))
requestBody = append(requestBody, recordSet...)
response, err := h.handleProduce(correlationID, requestBody)
response, err := h.handleProduce(correlationID, 7, requestBody)
if err != nil {
t.Fatalf("handleProduce: %v", err)
}
@ -164,13 +164,13 @@ func TestHandler_handleProduce_UnknownTopic(t *testing.T) {
// Partition 0 with minimal record set
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
recordSet := make([]byte, 32) // dummy record set
recordSet := make([]byte, 32) // dummy record set
binary.BigEndian.PutUint32(recordSet[16:20], 1) // record count
recordSetSize := uint32(len(recordSet))
requestBody = append(requestBody, byte(recordSetSize>>24), byte(recordSetSize>>16), byte(recordSetSize>>8), byte(recordSetSize))
requestBody = append(requestBody, recordSet...)
response, err := h.handleProduce(correlationID, requestBody)
response, err := h.handleProduce(correlationID, 7, requestBody)
if err != nil {
t.Fatalf("handleProduce: %v", err)
}
@ -229,7 +229,7 @@ func TestHandler_handleProduce_FireAndForget(t *testing.T) {
requestBody = append(requestBody, byte(recordSetSize>>24), byte(recordSetSize>>16), byte(recordSetSize>>8), byte(recordSetSize))
requestBody = append(requestBody, recordSet...)
response, err := h.handleProduce(correlationID, requestBody)
response, err := h.handleProduce(correlationID, 7, requestBody)
if err != nil {
t.Fatalf("handleProduce: %v", err)
}

Loading…
Cancel
Save