diff --git a/weed/mq/kafka/API_VERSION_MATRIX.md b/weed/mq/kafka/API_VERSION_MATRIX.md new file mode 100644 index 000000000..80457102e --- /dev/null +++ b/weed/mq/kafka/API_VERSION_MATRIX.md @@ -0,0 +1,80 @@ +# Kafka API Version Matrix Audit + +## Summary +This document audits the advertised API versions in `handleApiVersions()` against actual implementation support and identifies mismatches that need correction. + +## Current Status: MISMATCHES FOUND ⚠️ + +### API Version Discrepancies + +| API Key | API Name | Advertised | Actually Implemented | Status | Action Needed | +|---------|----------|------------|---------------------|--------|---------------| +| 18 | ApiVersions | v0-v3 | v0-v3 | ✅ Match | None | +| 3 | Metadata | v0-v7 | v0-v7 | ✅ Match | None | +| 0 | Produce | v0-v7 | v0-v7 | ✅ Match | None | +| 1 | Fetch | v0-v7 | v0-v7 | ✅ Match | None | +| 2 | ListOffsets | v0-v2 | v0-v2 | ✅ Match | None | +| 19 | CreateTopics | v0-v4 | v0-v5 | ❌ Mismatch | Update advertised to v0-v5 | +| 20 | DeleteTopics | v0-v4 | v0-v4 | ✅ Match | None | +| 11 | JoinGroup | v0-v7 | v0-v7 | ✅ Match | None | +| 14 | SyncGroup | v0-v5 | v0-v5 | ✅ Match | None | +| 8 | OffsetCommit | v0-v2 | v0-v2 | ✅ Match | None | +| 9 | OffsetFetch | v0-v2 | v0-v5+ | ❌ MAJOR Mismatch | Update advertised to v0-v5 | +| 10 | FindCoordinator | v0-v2 | v0-v2 | ✅ Match | None | +| 12 | Heartbeat | v0-v4 | v0-v4 | ✅ Match | None | +| 13 | LeaveGroup | v0-v4 | v0-v4 | ✅ Match | None | + +## Detailed Analysis + +### 1. OffsetFetch API (Key 9) - CRITICAL MISMATCH +- **Advertised**: v0-v2 (max version 2) +- **Actually Implemented**: Up to v5+ + - **Evidence**: `buildOffsetFetchResponse()` includes `if apiVersion >= 5` for leader epoch + - **Evidence**: `if apiVersion >= 3` for throttle time +- **Impact**: Clients may not use advanced features available in v3-v5 +- **Action**: Update advertised max version from 2 to 5 + +### 2. CreateTopics API (Key 19) - MINOR MISMATCH +- **Advertised**: v0-v4 (max version 4) +- **Actually Implemented**: v0-v5 + - **Evidence**: `handleCreateTopics()` routes v5 requests to `handleCreateTopicsV2Plus()` + - **Evidence**: Tests validate v0-v5 versions +- **Impact**: v5 clients may not connect expecting v5 support +- **Action**: Update advertised max version from 4 to 5 + +### 3. Validation vs Advertisement Inconsistency +The `validateAPIVersion()` function matches the advertised versions, which means it will incorrectly reject valid v3-v5 OffsetFetch requests that the handler can actually process. + +## Implementation Details + +### OffsetFetch Version Features: +- **v0-v2**: Basic offset fetch +- **v3+**: Includes throttle_time_ms +- **v5+**: Includes leader_epoch for each partition + +### CreateTopics Version Features: +- **v0-v1**: Regular array format +- **v2-v5**: Compact array format, tagged fields + +## Recommendations + +1. **Immediate Fix**: Update `handleApiVersions()` to advertise correct max versions +2. **Consistency Check**: Update `validateAPIVersion()` to match the corrected advertised versions +3. **Testing**: Verify that higher version clients can successfully connect and use advanced features +4. **Documentation**: Update any client guidance to reflect the correct supported versions + +## Test Verification Needed + +After fixes: +1. Test OffsetFetch v3, v4, v5 with real Kafka clients +2. Test CreateTopics v5 with real Kafka clients +3. Verify throttle_time_ms and leader_epoch are correctly populated +4. Ensure version validation doesn't reject valid higher version requests + +## Conservative Alternative + +If we want to be conservative and not advertise versions we haven't thoroughly tested: +- **OffsetFetch**: Limit to v3 (throttle time support) instead of v5 +- **CreateTopics**: Keep at v4 unless v5 is specifically needed + +This would still fix the main discrepancy while being more cautious about untested version features. diff --git a/weed/mq/kafka/protocol/api_versions_test.go b/weed/mq/kafka/protocol/api_versions_test.go new file mode 100644 index 000000000..fb979088e --- /dev/null +++ b/weed/mq/kafka/protocol/api_versions_test.go @@ -0,0 +1,311 @@ +package protocol + +import ( + "encoding/binary" + "fmt" + "testing" +) + +func TestApiVersions_AdvertisedVersionsMatch(t *testing.T) { + handler := NewTestHandler() + defer handler.Close() + + response, err := handler.handleApiVersions(12345) + if err != nil { + t.Fatalf("handleApiVersions failed: %v", err) + } + + if len(response) < 10 { + t.Fatalf("Response too short: %d bytes", len(response)) + } + + // Check correlation ID + correlationID := binary.BigEndian.Uint32(response[0:4]) + if correlationID != 12345 { + t.Errorf("Expected correlation ID 12345, got %d", correlationID) + } + + // Check error code + errorCode := binary.BigEndian.Uint16(response[4:6]) + if errorCode != 0 { + t.Errorf("Expected error code 0, got %d", errorCode) + } + + // Check number of API keys + numAPIKeys := binary.BigEndian.Uint32(response[6:10]) + expectedAPIKeys := uint32(14) + if numAPIKeys != expectedAPIKeys { + t.Errorf("Expected %d API keys, got %d", expectedAPIKeys, numAPIKeys) + } + + // Parse and verify specific API versions + offset := 10 + apiVersionMap := make(map[uint16][2]uint16) // apiKey -> [minVersion, maxVersion] + + for i := uint32(0); i < numAPIKeys && offset+6 <= len(response); i++ { + apiKey := binary.BigEndian.Uint16(response[offset : offset+2]) + minVersion := binary.BigEndian.Uint16(response[offset+2 : offset+4]) + maxVersion := binary.BigEndian.Uint16(response[offset+4 : offset+6]) + offset += 6 + + apiVersionMap[apiKey] = [2]uint16{minVersion, maxVersion} + } + + // Verify critical corrected versions + expectedVersions := map[uint16][2]uint16{ + 9: {0, 5}, // OffsetFetch: should now be v0-v5 + 19: {0, 5}, // CreateTopics: should now be v0-v5 + 3: {0, 7}, // Metadata: should be v0-v7 + 18: {0, 3}, // ApiVersions: should be v0-v3 + } + + for apiKey, expected := range expectedVersions { + if actual, exists := apiVersionMap[apiKey]; exists { + if actual[0] != expected[0] || actual[1] != expected[1] { + t.Errorf("API %d version mismatch: expected v%d-v%d, got v%d-v%d", + apiKey, expected[0], expected[1], actual[0], actual[1]) + } + } else { + t.Errorf("API %d not found in response", apiKey) + } + } +} + +func TestValidateAPIVersion_UpdatedVersions(t *testing.T) { + handler := NewTestHandler() + defer handler.Close() + + testCases := []struct { + name string + apiKey uint16 + version uint16 + shouldErr bool + }{ + // OffsetFetch - should now support up to v5 + {"OffsetFetch v2", 9, 2, false}, + {"OffsetFetch v3", 9, 3, false}, // Was rejected before, should work now + {"OffsetFetch v4", 9, 4, false}, // Was rejected before, should work now + {"OffsetFetch v5", 9, 5, false}, // Was rejected before, should work now + {"OffsetFetch v6", 9, 6, true}, // Should still be rejected + + // CreateTopics - should now support up to v5 + {"CreateTopics v4", 19, 4, false}, + {"CreateTopics v5", 19, 5, false}, // Was rejected before, should work now + {"CreateTopics v6", 19, 6, true}, // Should be rejected + + // Metadata - should still support up to v7 + {"Metadata v7", 3, 7, false}, + {"Metadata v8", 3, 8, true}, + + // ApiVersions - should still support up to v3 + {"ApiVersions v3", 18, 3, false}, + {"ApiVersions v4", 18, 4, true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := handler.validateAPIVersion(tc.apiKey, tc.version) + if tc.shouldErr && err == nil { + t.Errorf("Expected error for API %d version %d, but got none", tc.apiKey, tc.version) + } + if !tc.shouldErr && err != nil { + t.Errorf("Unexpected error for API %d version %d: %v", tc.apiKey, tc.version, err) + } + }) + } +} + +func TestOffsetFetch_HigherVersionSupport(t *testing.T) { + handler := NewTestHandler() + defer handler.Close() + + // Test that OffsetFetch v3, v4, v5 are now accepted + versions := []uint16{3, 4, 5} + + for _, version := range versions { + t.Run(fmt.Sprintf("OffsetFetch_v%d", version), func(t *testing.T) { + // Create a basic OffsetFetch request + requestBody := make([]byte, 0, 64) + + // Group ID (string: "test-group") + groupID := "test-group" + requestBody = append(requestBody, 0, byte(len(groupID))) // length + requestBody = append(requestBody, []byte(groupID)...) // group ID + + // Topics array (1 topic) + requestBody = append(requestBody, 0, 0, 0, 1) // topics count + + // Topic: "test-topic" + topicName := "test-topic" + requestBody = append(requestBody, 0, byte(len(topicName))) // topic name length + requestBody = append(requestBody, []byte(topicName)...) // topic name + + // Partitions array (1 partition) + requestBody = append(requestBody, 0, 0, 0, 1) // partitions count + requestBody = append(requestBody, 0, 0, 0, 0) // partition 0 + + // Call handler with the higher version + response, err := handler.handleOffsetFetch(12345, version, requestBody) + + if err != nil { + t.Fatalf("OffsetFetch v%d failed: %v", version, err) + } + + if len(response) < 8 { + t.Fatalf("OffsetFetch v%d response too short: %d bytes", version, len(response)) + } + + // Check correlation ID + correlationID := binary.BigEndian.Uint32(response[0:4]) + if correlationID != 12345 { + t.Errorf("OffsetFetch v%d: expected correlation ID 12345, got %d", version, correlationID) + } + }) + } +} + +func TestCreateTopics_V5Support(t *testing.T) { + handler := NewTestHandler() + defer handler.Close() + + // Test that CreateTopics v5 is now accepted and works + requestBody := make([]byte, 0, 128) + + // Build v5 request (compact format) + // Topics array (compact: 1 topic = 2) + requestBody = append(requestBody, 0x02) + + // Topic: "v5-test-topic" + topicName := "v5-test-topic" + requestBody = append(requestBody, byte(len(topicName)+1)) // Compact string length + requestBody = append(requestBody, []byte(topicName)...) // Topic name + + // num_partitions = 2 + requestBody = append(requestBody, 0x00, 0x00, 0x00, 0x02) + + // replication_factor = 1 + requestBody = append(requestBody, 0x00, 0x01) + + // configs array (compact: empty = 0) + requestBody = append(requestBody, 0x00) + + // tagged fields (empty) + requestBody = append(requestBody, 0x00) + + // timeout_ms = 5000 + requestBody = append(requestBody, 0x00, 0x00, 0x13, 0x88) + + // validate_only = false + requestBody = append(requestBody, 0x00) + + // tagged fields at end + requestBody = append(requestBody, 0x00) + + // Call handler with v5 + response, err := handler.handleCreateTopics(12346, 5, requestBody) + + if err != nil { + t.Fatalf("CreateTopics v5 failed: %v", err) + } + + if len(response) < 8 { + t.Fatalf("CreateTopics v5 response too short: %d bytes", len(response)) + } + + // Check correlation ID + correlationID := binary.BigEndian.Uint32(response[0:4]) + if correlationID != 12346 { + t.Errorf("CreateTopics v5: expected correlation ID 12346, got %d", correlationID) + } + + // Verify topic was created + if !handler.seaweedMQHandler.TopicExists("v5-test-topic") { + t.Error("CreateTopics v5: topic was not created") + } +} + +// Benchmark to ensure version validation is efficient +func BenchmarkValidateAPIVersion(b *testing.B) { + handler := NewTestHandler() + defer handler.Close() + + // Test common API versions + testCases := []struct { + apiKey uint16 + version uint16 + }{ + {9, 3}, // OffsetFetch v3 + {9, 5}, // OffsetFetch v5 + {19, 5}, // CreateTopics v5 + {3, 7}, // Metadata v7 + {18, 3}, // ApiVersions v3 + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tc := testCases[i%len(testCases)] + _ = handler.validateAPIVersion(tc.apiKey, tc.version) + } +} + +func TestApiVersions_ResponseFormat(t *testing.T) { + handler := NewTestHandler() + defer handler.Close() + + response, err := handler.handleApiVersions(99999) + if err != nil { + t.Fatalf("handleApiVersions failed: %v", err) + } + + // Verify the response can be parsed correctly + if len(response) < 10 { + t.Fatalf("Response too short for basic parsing") + } + + offset := 0 + + // Correlation ID (4 bytes) + correlationID := binary.BigEndian.Uint32(response[offset : offset+4]) + if correlationID != 99999 { + t.Errorf("Wrong correlation ID: expected 99999, got %d", correlationID) + } + offset += 4 + + // Error code (2 bytes) + errorCode := binary.BigEndian.Uint16(response[offset : offset+2]) + if errorCode != 0 { + t.Errorf("Non-zero error code: %d", errorCode) + } + offset += 2 + + // Number of API keys (4 bytes) + numAPIKeys := binary.BigEndian.Uint32(response[offset : offset+4]) + if numAPIKeys != 14 { + t.Errorf("Wrong number of API keys: expected 14, got %d", numAPIKeys) + } + offset += 4 + + // Verify each API key entry format (apiKey + minVer + maxVer) + for i := uint32(0); i < numAPIKeys && offset+6 <= len(response); i++ { + apiKey := binary.BigEndian.Uint16(response[offset : offset+2]) + minVersion := binary.BigEndian.Uint16(response[offset+2 : offset+4]) + maxVersion := binary.BigEndian.Uint16(response[offset+4 : offset+6]) + + // Verify minVersion <= maxVersion + if minVersion > maxVersion { + t.Errorf("API %d: invalid version range %d-%d", apiKey, minVersion, maxVersion) + } + + // Verify minVersion is typically 0 + if minVersion != 0 { + t.Errorf("API %d: unexpected min version %d (expected 0)", apiKey, minVersion) + } + + offset += 6 + } + + if offset != len(response) { + t.Errorf("Response parsing mismatch: expected %d bytes, parsed %d", len(response), offset) + } +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 864265279..32cb543e8 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -435,7 +435,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // API Key 19 (CreateTopics): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 19) // API key 19 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 4) // max version 4 + response = append(response, 0, 5) // max version 5 // API Key 20 (DeleteTopics): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 20) // API key 20 @@ -468,10 +468,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 2) // max version 2 - // API Key 9 (OffsetFetch): limit to v2 (implemented and tested) + // API Key 9 (OffsetFetch): supports up to v5 (with leader epoch and throttle time) response = append(response, 0, 9) // API key 9 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 2) // max version 2 + response = append(response, 0, 5) // max version 5 // API Key 10 (FindCoordinator): limit to v2 (implemented) response = append(response, 0, 10) // API key 10 @@ -1570,7 +1570,7 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt } } - fmt.Printf("DEBUG: CreateTopics v0/v1 - Parsed topic: %s, partitions: %d, replication: %d\n", + fmt.Printf("DEBUG: CreateTopics v0/v1 - Parsed topic: %s, partitions: %d, replication: %d\n", topicName, numPartitions, replicationFactor) // Build response for this topic @@ -1718,13 +1718,13 @@ func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { 0: {0, 7}, // Produce: v0-v7 1: {0, 7}, // Fetch: v0-v7 2: {0, 2}, // ListOffsets: v0-v2 - 19: {0, 4}, // CreateTopics: v0-v4 + 19: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation) 20: {0, 4}, // DeleteTopics: v0-v4 10: {0, 2}, // FindCoordinator: v0-v2 11: {0, 7}, // JoinGroup: v0-v7 14: {0, 5}, // SyncGroup: v0-v5 8: {0, 2}, // OffsetCommit: v0-v2 - 9: {0, 2}, // OffsetFetch: v0-v2 + 9: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation) 12: {0, 4}, // Heartbeat: v0-v4 13: {0, 4}, // LeaveGroup: v0-v4 }